diff --git a/README.md b/README.md index f38043b..ee12b39 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,11 @@ const metadata = parquetMetadata(arrayBuffer) To parse parquet files from a user drag-and-drop action, see example in [index.html](index.html). +## Async + +Hyparquet supports asynchronous fetching of parquet files, over a network. +You can provide an `AsyncBuffer` which is like a js `ArrayBuffer` but the `slice` method returns `Promise`. + ## Supported Parquet Files The parquet format supports a number of different compression and encoding types. diff --git a/src/column.js b/src/column.js index f82cd17..98c5d19 100644 --- a/src/column.js +++ b/src/column.js @@ -57,6 +57,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const dictionaryEncoding = daph.encoding === Encoding.PLAIN_DICTIONARY || daph.encoding === Encoding.RLE_DICTIONARY // construct output values: skip nulls and construct lists + /** @type {any[]} */ let values if (repetitionLevels.length) { // Use repetition levels to construct lists @@ -73,32 +74,8 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, } else if (definitionLevels?.length) { const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) // Use definition levels to skip nulls - let index = 0 values = [] - const decoder = new TextDecoder() - for (let i = 0; i < definitionLevels.length; i++) { - if (definitionLevels[i] === maxDefinitionLevel) { - if (index > dataPage.length) { - throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`) - } - let v = dataPage[index++] - - // map to dictionary value - if (dictionary) { - v = dictionary[v] - if (v instanceof Uint8Array) { - try { - v = decoder.decode(v) - } catch (e) { - console.warn('parquet failed to decode byte array as string', e) - } - } - } - values[i] = v - } else { - values[i] = undefined - } - } + skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, values) } else { if (dictionaryEncoding && dictionary !== undefined && Array.isArray(dataPage)) { // dereference dictionary values @@ -124,7 +101,9 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, const diph = header.dictionary_page_header if (!diph) throw new Error('parquet dictionary page header is undefined') - const page = decompressPage(compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec) + const page = decompressPage( + compressedBytes, Number(header.uncompressed_page_size), columnMetadata.codec + ) dictionary = readDictionaryPage(page, diph, schema, columnMetadata) } else if (header.type === PageType.DATA_PAGE_V2) { throw new Error('parquet data page v2 not supported') @@ -235,3 +214,41 @@ function decompressPage(compressedBytes, uncompressed_page_size, codec) { } return page } + +/** + * Expand data page list with nulls and convert to utf8. + * @param {number[]} definitionLevels + * @param {number} maxDefinitionLevel + * @param {ArrayLike} dataPage + * @param {any} dictionary + * @param {any[]} output + */ +function skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, output) { + if (output.length) throw new Error('parquet output array is not empty') + // Use definition levels to skip nulls + let index = 0 + const decoder = new TextDecoder() + for (let i = 0; i < definitionLevels.length; i++) { + if (definitionLevels[i] === maxDefinitionLevel) { + if (index > dataPage.length) { + throw new Error(`parquet index ${index} exceeds data page length ${dataPage.length}`) + } + let v = dataPage[index++] + + // map to dictionary value + if (dictionary) { + v = dictionary[v] + if (v instanceof Uint8Array) { + try { + v = decoder.decode(v) + } catch (e) { + console.warn('parquet failed to decode byte array as string', e) + } + } + } + output[i] = v + } else { + output[i] = undefined + } + } +} diff --git a/src/datapage.js b/src/datapage.js index 606ff22..881d495 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -11,7 +11,8 @@ import { const skipNulls = false // TODO /** - * @typedef {{ definitionLevels: number[] | undefined, repetitionLevels: number[], value: ArrayLike }} DataPage + * @typedef {{ byteLength: number, definitionLevels: number[], numNulls: number }} DefinitionLevels + * @typedef {import("./types.d.ts").DataPage} DataPage * @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData * @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader * @typedef {import("./types.d.ts").DictionaryPageHeader} DictionaryPageHeader @@ -34,7 +35,7 @@ const skipNulls = false // TODO export function readDataPage(bytes, daph, schema, columnMetadata) { const dataView = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength) let offset = 0 - /** @type {ArrayLike} */ + /** @type {any[]} */ let values = [] // repetition levels @@ -47,23 +48,24 @@ export function readDataPage(bytes, daph, schema, columnMetadata) { let definitionLevels = undefined let numNulls = 0 // let maxDefinitionLevel = -1 + // TODO: move into readDefinitionLevels if (skipNulls && !isRequired(schema, columnMetadata.path_in_schema)) { // skip_definition_bytes offset += skipDefinitionBytes(daph.num_values) } else { - const dl = readDefinitionLevels(dataView, offset, daph, schema, columnMetadata) + const dl = readDefinitionLevels(dataView, offset, daph, schema, columnMetadata.path_in_schema) definitionLevels = dl.definitionLevels numNulls = dl.numNulls offset += dl.byteLength } // read values based on encoding - const nval = daph.num_values - numNulls + const nValues = daph.num_values - numNulls if (daph.encoding === Encoding.PLAIN) { const se = schemaElement(schema, columnMetadata.path_in_schema) const utf8 = se.converted_type === 'UTF8' - const plainObj = readPlain(dataView, columnMetadata.type, nval, offset, utf8) - values = plainObj.value + const plainObj = readPlain(dataView, columnMetadata.type, nValues, offset, utf8) + values = Array.isArray(plainObj.value) ? plainObj.value : Array.from(plainObj.value) offset += plainObj.byteLength } else if ( daph.encoding === Encoding.PLAIN_DICTIONARY || @@ -81,13 +83,13 @@ export function readDataPage(bytes, daph, schema, columnMetadata) { } if (bitWidth) { const { value, byteLength } = readRleBitPackedHybrid( - dataView, offset, bitWidth, dataView.byteLength - offset, nval + dataView, offset, bitWidth, dataView.byteLength - offset, nValues ) offset += byteLength - values = value + values = Array.isArray(value) ? value : Array.from(value) } else { // nval zeros - values = new Array(nval).fill(0) + values = new Array(nValues).fill(0) } } else { throw new Error(`parquet unsupported encoding: ${daph.encoding}`) @@ -136,8 +138,6 @@ function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) { return { value: [], byteLength: 0 } } -/** @typedef {{ byteLength: number, definitionLevels: number[], numNulls: number }} DefinitionLevels */ - /** * Read the definition levels from this page, if any. * @@ -145,12 +145,12 @@ function readRepetitionLevels(dataView, offset, daph, schema, columnMetadata) { * @param {number} offset offset to start reading from * @param {DataPageHeader} daph data page header * @param {SchemaElement[]} schema schema for the file - * @param {ColumnMetaData} columnMetadata metadata for the column + * @param {string[]} path_in_schema path in the schema * @returns {DefinitionLevels} definition levels and number of bytes read */ -function readDefinitionLevels(dataView, offset, daph, schema, columnMetadata) { - if (!isRequired(schema, columnMetadata.path_in_schema)) { - const maxDefinitionLevel = getMaxDefinitionLevel(schema, columnMetadata.path_in_schema) +function readDefinitionLevels(dataView, offset, daph, schema, path_in_schema) { + if (!isRequired(schema, path_in_schema)) { + const maxDefinitionLevel = getMaxDefinitionLevel(schema, path_in_schema) const bitWidth = widthFromMaxInt(maxDefinitionLevel) if (bitWidth) { // num_values is index 1 for either type of page header diff --git a/src/encoding.js b/src/encoding.js index afcf2c8..cde1d53 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -149,12 +149,13 @@ function readPlainByteArrayFixed(dataView, offset, fixedLength) { /** * Read `count` values of the given type from the dataView. * + * @typedef {import("./types.d.ts").DecodedArray} DecodedArray * @param {DataView} dataView - buffer to read data from * @param {number} type - parquet type of the data * @param {number} count - number of values to read * @param {number} offset - offset to start reading from the DataView * @param {boolean} utf8 - whether to decode byte arrays as UTF-8 - * @returns {Decoded>} array of values + * @returns {Decoded} array of values */ export function readPlain(dataView, type, count, offset, utf8) { if (count === 0) return { value: [], byteLength: 0 } @@ -323,7 +324,13 @@ function readBitPacked(dataView, offset, header, bitWidth, remaining) { let count = (header >> 1) << 3 const mask = maskForBits(bitWidth) - let data = dataView.getUint8(offset) + // Sometimes it tries to read outside of available memory, but it will be masked out anyway + let data = 0 + if (offset < dataView.byteLength) { + data = dataView.getUint8(offset) + } else if (mask) { + throw new Error(`parquet bitpack offset ${offset} out of range`) + } let byteLength = 1 let left = 8 let right = 0 diff --git a/src/header.js b/src/header.js index 7ba54cf..d0642d1 100644 --- a/src/header.js +++ b/src/header.js @@ -52,7 +52,7 @@ export function parquetHeader(arrayBuffer, offset) { encoding: header.field_8.field_4, definition_levels_byte_length: header.field_8.field_5, repetition_levels_byte_length: header.field_8.field_6, - is_compressed: header.field_8.field_7, + is_compressed: header.field_8.field_7 === undefined ? true : header.field_8.field_7, // default to true statistics: header.field_8.field_8, } diff --git a/src/read.js b/src/read.js index 64bace5..f1a22c1 100644 --- a/src/read.js +++ b/src/read.js @@ -138,7 +138,7 @@ async function readRowGroup(options, rowGroup) { arrayBuffer, bufferOffset, rowGroup, columnMetadata, metadata.schema ) if (columnData.length !== Number(rowGroup.num_rows)) { - throw new Error('parquet column length does not match row group length') + throw new Error(`parquet column length ${columnData.length} does not match row group length ${rowGroup.num_rows}`) } // notify caller of column data if (options.onChunk) options.onChunk({ column: columnIndex, data: columnData, rowStart: 0, rowEnd: columnData.length }) diff --git a/src/types.d.ts b/src/types.d.ts index 21e8738..c4afa44 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -212,3 +212,11 @@ interface DataPageHeaderV2 { is_compressed?: boolean statistics?: Statistics } + +type DecodedArray = any[] | Uint8Array + +interface DataPage { + definitionLevels: number[] | undefined + repetitionLevels: number[] + value: any[] +} diff --git a/test/snappy.test.js b/test/snappy.test.js index 4396770..f199836 100644 --- a/test/snappy.test.js +++ b/test/snappy.test.js @@ -4,20 +4,26 @@ import { snappyUncompress } from '../src/snappy.js' describe('snappy uncompress', () => { it('decompresses valid input correctly', async () => { const testCases = [ - { compressed: new Uint8Array([0x00]), expected: '' }, - { compressed: new Uint8Array([0x01, 0x00, 0x68]), expected: 'h' }, - { compressed: new Uint8Array([0x02, 0x04, 0x68, 0x79]), expected: 'hy' }, - { compressed: new Uint8Array([0x03, 0x08, 0x68, 0x79, 0x70]), expected: 'hyp' }, - { compressed: new Uint8Array([0x05, 0x10, 0x68, 0x79, 0x70, 0x65, 0x72]), expected: 'hyper' }, - { compressed: new Uint8Array([0x0a, 0x24, 0x68, 0x79, 0x70, 0x65, 0x72, 0x70, 0x61, 0x72, 0x61, 0x6d]), expected: 'hyperparam' }, - { compressed: new Uint8Array([0x15, 0x08, 0x68, 0x79, 0x70, 0x46, 0x03, 0x00]), expected: 'hyphyphyphyphyphyphyp' }, + { compressed: [0x00], expected: '' }, + { compressed: [0x01, 0x00, 0x68], expected: 'h' }, + { compressed: [0x02, 0x04, 0x68, 0x79], expected: 'hy' }, + { compressed: [0x03, 0x08, 0x68, 0x79, 0x70], expected: 'hyp' }, + { compressed: [0x05, 0x10, 0x68, 0x79, 0x70, 0x65, 0x72], expected: 'hyper' }, + { + compressed: [0x0a, 0x24, 0x68, 0x79, 0x70, 0x65, 0x72, 0x70, 0x61, 0x72, 0x61, 0x6d], + expected: 'hyperparam', + }, + { + compressed: [0x15, 0x08, 0x68, 0x79, 0x70, 0x46, 0x03, 0x00], + expected: 'hyphyphyphyphyphyphyp', + }, { // from rowgroups.parquet - compressed: new Uint8Array([ + compressed: [ 80, 4, 1, 0, 9, 1, 0, 2, 9, 7, 4, 0, 3, 13, 8, 0, 4, 13, 8, 0, 5, 13, 8, 0, 6, 13, 8, 0, 7, 13, 8, 0, 8, 13, 8, 60, 9, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0, - ]), + ], expected: new Uint8Array([ 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, @@ -25,16 +31,19 @@ describe('snappy uncompress', () => { 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0, ]), }, + // from datapage_v2.snappy.parquet + { compressed: [2, 4, 0, 3], expected: new Uint8Array([0, 3]) }, + { compressed: [ 6, 20, 2, 0, 0, 0, 3, 23], expected: new Uint8Array([2, 0, 0, 0, 3, 23]) }, ] const futures = testCases.map(async ({ compressed, expected }) => { - const outputArray = new Uint8Array(expected.length) - await snappyUncompress(compressed, outputArray) + const output = new Uint8Array(expected.length) + await snappyUncompress(new Uint8Array(compressed), output) if (typeof expected === 'string') { - const outputStr = new TextDecoder().decode(outputArray) + const outputStr = new TextDecoder().decode(output) expect(outputStr).toBe(expected) } else { - expect(outputArray).toEqual(expected) // Uint8Array + expect(output).toEqual(expected) // Uint8Array } }) @@ -42,16 +51,16 @@ describe('snappy uncompress', () => { }) it('throws for invalid input', () => { - const outputArray = new Uint8Array(10) - expect(() => snappyUncompress(new Uint8Array([]), outputArray)) + const output = new Uint8Array(10) + expect(() => snappyUncompress(new Uint8Array([]), output)) .toThrow('invalid snappy length header') - expect(() => snappyUncompress(new Uint8Array([0xff]), outputArray)) + expect(() => snappyUncompress(new Uint8Array([0xff]), output)) .toThrow('invalid snappy length header') - expect(() => snappyUncompress(new Uint8Array([0x03, 0x61]), outputArray)) + expect(() => snappyUncompress(new Uint8Array([0x03, 0x61]), output)) .toThrow('missing eof marker') - expect(() => snappyUncompress(new Uint8Array([0x03, 0xf1]), outputArray)) + expect(() => snappyUncompress(new Uint8Array([0x03, 0xf1]), output)) .toThrow('missing eof marker') - expect(() => snappyUncompress(new Uint8Array([0x02, 0x00, 0x68]), outputArray)) + expect(() => snappyUncompress(new Uint8Array([0x02, 0x00, 0x68]), output)) .toThrow('premature end of input') }) })