Convert byte arrays to utf8 by default

This commit is contained in:
Kenny Daniel 2024-05-22 22:24:54 -07:00
parent 1f8289b4b2
commit c4ad05e580
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
8 changed files with 58 additions and 55 deletions

@ -7,44 +7,33 @@ import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
import { snappyUncompress } from './snappy.js'
import { concat } from './utils.js'
/**
* @typedef {import('./types.js').SchemaTree} SchemaTree
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
* @typedef {import('./types.js').Compressors} Compressors
* @typedef {import('./types.js').RowGroup} RowGroup
*/
/**
* Parse column data from a buffer.
*
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @param {number} columnOffset offset to start reading from
* @param {RowGroup} rowGroup row group metadata
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
* @param {import('./types.js').DataReader} reader
* @param {import('./types.js').RowGroup} rowGroup row group metadata
* @param {ColumnMetaData} columnMetadata column metadata
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {Compressors} [compressors] custom decompressors
* @param {import('./types.js').SchemaTree[]} schemaPath schema path for the column
* @param {import('./hyparquet.js').ParquetReadOptions} options read options
* @returns {any[]} array of values
*/
export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, schemaPath, compressors) {
export function readColumn(reader, rowGroup, columnMetadata, schemaPath, { compressors, utf8 }) {
const { element } = schemaPath[schemaPath.length - 1]
/** @type {ArrayLike<any> | undefined} */
let dictionary = undefined
let seen = 0
/** @type {any[]} */
const rowData = []
const { element } = schemaPath[schemaPath.length - 1]
// column reader:
const reader = { view: new DataView(arrayBuffer, columnOffset), offset: 0 }
while (seen < rowGroup.num_rows) {
// parse column header
const header = parquetHeader(reader)
if (header.compressed_page_size === undefined) {
throw new Error('parquet compressed page size is undefined')
}
// assert(header.compressed_page_size !== undefined)
// read compressed_page_size bytes starting at offset
const compressedBytes = new Uint8Array(
arrayBuffer, columnOffset + reader.offset, header.compressed_page_size
reader.view.buffer, reader.view.byteOffset + reader.offset, header.compressed_page_size
)
// parse page data by type
@ -54,16 +43,14 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
const daph = header.data_page_header
if (!daph) throw new Error('parquet data page header is undefined')
const page = decompressPage(
compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors
)
const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec, compressors)
const { definitionLevels, repetitionLevels, dataPage } = readDataPage(page, daph, schemaPath, columnMetadata)
seen += daph.num_values
// assert(!daph.statistics || daph.statistics.null_count === BigInt(daph.num_values - dataPage.length))
// construct output values: skip nulls and construct lists
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
values = convert(dataPage, element, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
// Use repetition levels to construct lists
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
@ -92,7 +79,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
seen += daph2.num_values
dereferenceDictionary(dictionary, dataPage)
values = convert(dataPage, element)
values = convert(dataPage, element, utf8)
if (repetitionLevels.length || definitionLevels?.length) {
// Use repetition levels to construct lists
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
@ -155,7 +142,7 @@ export function getColumnOffset({ dictionary_page_offset, data_page_offset }) {
* @param {Uint8Array} compressedBytes
* @param {number} uncompressed_page_size
* @param {import('./types.js').CompressionCodec} codec
* @param {Compressors | undefined} compressors
* @param {import('./types.js').Compressors | undefined} compressors
* @returns {Uint8Array}
*/
export function decompressPage(compressedBytes, uncompressed_page_size, codec, compressors) {

@ -6,18 +6,11 @@ const dayMillis = 86400000 // 1 day in milliseconds
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {DecodedArray} data series of primitive types
* @param {import('./types.js').SchemaElement} schemaElement schema element for the data
* @param {boolean | undefined} utf8 decode bytes as utf8?
* @returns {DecodedArray} series of rich types
*/
export function convert(data, schemaElement) {
export function convert(data, schemaElement, utf8 = true) {
const ctype = schemaElement.converted_type
if (ctype === 'UTF8') {
const decoder = new TextDecoder()
const arr = new Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = data[i] && decoder.decode(data[i])
}
return arr
}
if (ctype === 'DECIMAL') {
const scale = schemaElement.scale || 0
const factor = Math.pow(10, -scale)
@ -50,12 +43,19 @@ export function convert(data, schemaElement) {
if (ctype === 'INTERVAL') {
throw new Error('parquet interval not supported')
}
if (ctype === 'UTF8' || utf8 && schemaElement.type === 'BYTE_ARRAY') {
const decoder = new TextDecoder()
const arr = new Array(data.length)
for (let i = 0; i < arr.length; i++) {
arr[i] = data[i] && decoder.decode(data[i])
}
return arr
}
// TODO: ctype UINT
const logicalType = schemaElement.logical_type?.type
if (logicalType === 'FLOAT16') {
return Array.from(data).map(parseFloat16)
}
// TODO: logical types
return data
}

1
src/hyparquet.d.ts vendored

@ -100,6 +100,7 @@ export interface ParquetReadOptions {
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
}
/**

@ -81,7 +81,7 @@ export async function parquetRead(options) {
* @returns {Promise<any[][]>} resolves to row data
*/
async function readRowGroup(options, rowGroup, groupStart) {
const { file, metadata, columns, compressors } = options
const { file, metadata, columns } = options
if (!metadata) throw new Error('parquet metadata not found')
// loop through metadata to find min/max bytes to read
@ -151,10 +151,9 @@ async function readRowGroup(options, rowGroup, groupStart) {
// read column data async
promises.push(buffer.then(arrayBuffer => {
const schemaPath = getSchemaPath(metadata.schema, columnMetadata.path_in_schema)
const reader = { view: new DataView(arrayBuffer), offset: bufferOffset }
/** @type {any[] | undefined} */
let columnData = readColumn(
arrayBuffer, bufferOffset, rowGroup, columnMetadata, schemaPath, compressors
)
let columnData = readColumn(reader, rowGroup, columnMetadata, schemaPath, options)
// assert(columnData.length === Number(rowGroup.num_rows)
// TODO: fast path for non-nested columns

@ -13,11 +13,27 @@ describe('convert function', () => {
expect(convert(data, schemaElement)).toEqual(data)
})
it('converts byte arrays to UTF8 strings', () => {
const data = [new TextEncoder().encode('test'), new TextEncoder().encode('vitest')]
it('converts byte arrays to utf8', () => {
const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')]
/** @type {SchemaElement} */
const schemaElement = { name, converted_type: 'UTF8' }
expect(convert(data, schemaElement)).toEqual(['test', 'vitest'])
expect(convert(data, schemaElement)).toEqual(['foo', 'bar'])
})
it('converts byte arrays to utf8 default true', () => {
const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')]
/** @type {SchemaElement} */
const schemaElement = { name, type: 'BYTE_ARRAY' }
expect(convert(data, schemaElement)).toEqual(['foo', 'bar'])
})
it('preserves byte arrays utf8=false', () => {
const data = [new TextEncoder().encode('foo'), new TextEncoder().encode('bar')]
/** @type {SchemaElement} */
const schemaElement = { name, type: 'BYTE_ARRAY' }
expect(convert(data, schemaElement, false)).toEqual([
new Uint8Array([102, 111, 111]), new Uint8Array([98, 97, 114]),
])
})
it('converts numbers to DECIMAL', () => {

@ -1,22 +1,22 @@
[
[
1593604800,
[97, 98, 99],
"abc",
42
],
[
1593604800,
[100, 101, 102],
"def",
7.7
],
[
1593604801,
[97, 98, 99],
"abc",
42.125
],
[
1593604801,
[100, 101, 102],
"def",
7.7
]
]

@ -1,22 +1,22 @@
[
[
1593604800,
[97, 98, 99],
"abc",
42
],
[
1593604800,
[100, 101, 102],
"def",
7.7
],
[
1593604801,
[97, 98, 99],
"abc",
42.125
],
[
1593604801,
[100, 101, 102],
"def",
7.7
]
]

@ -1,22 +1,22 @@
[
[
1593604800,
[97, 98, 99],
"abc",
42
],
[
1593604800,
[100, 101, 102],
"def",
7.7
],
[
1593604801,
[97, 98, 99],
"abc",
42.125
],
[
1593604801,
[100, 101, 102],
"def",
7.7
]
]