diff --git a/.gitignore b/.gitignore index e6dc62e..de1c185 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ package-lock.json coverage *.tgz .vscode +.DS_Store /*.parquet diff --git a/src/assemble.js b/src/assemble.js index 90f65dd..fd29dac 100644 --- a/src/assemble.js +++ b/src/assemble.js @@ -158,7 +158,8 @@ export function assembleNested(subcolumnData, schema, depth = 0) { subcolumnData.delete(child.path.join('.')) } // invert struct by depth - const inverted = invertStruct(struct, nextDepth) + const invertDepth = schema.element.repetition_type === 'REQUIRED' ? depth : depth + 1 + const inverted = invertStruct(struct, invertDepth) if (optional) flattenAtDepth(inverted, depth) subcolumnData.set(path, inverted) return diff --git a/src/byteStreamSplit.js b/src/byteStreamSplit.js deleted file mode 100644 index c308fce..0000000 --- a/src/byteStreamSplit.js +++ /dev/null @@ -1,14 +0,0 @@ -/** - * @param {import('./types.d.ts').DataReader} reader - * @param {number} nValues - * @param {Float32Array | Float64Array} output - */ -export function byteStreamSplit(reader, nValues, output) { - const byteWidth = output instanceof Float32Array ? 4 : 8 - const bytes = new Uint8Array(output.buffer) - for (let b = 0; b < byteWidth; b++) { - for (let i = 0; i < nValues; i++) { - bytes[i * byteWidth + b] = reader.view.getUint8(reader.offset++) - } - } -} diff --git a/src/datapage.js b/src/datapage.js index eaa197c..0593060 100644 --- a/src/datapage.js +++ b/src/datapage.js @@ -1,5 +1,4 @@ -import { byteStreamSplit } from './byteStreamSplit.js' -import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' +import { byteStreamSplit, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' import { readPlain } from './plain.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired } from './schema.js' diff --git a/src/datapageV2.js b/src/datapageV2.js index 33bcea9..3e46689 100644 --- a/src/datapageV2.js +++ b/src/datapageV2.js @@ -1,7 +1,6 @@ -import { byteStreamSplit } from './byteStreamSplit.js' import { decompressPage } from './column.js' import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js' -import { readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' +import { byteStreamSplit, readRleBitPackedHybrid, widthFromMaxInt } from './encoding.js' import { readPlain } from './plain.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js' diff --git a/src/encoding.js b/src/encoding.js index 4fdad0f..d3f8e00 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -125,3 +125,18 @@ function readBitPacked(reader, header, bitWidth, values, seen) { return seen } + +/** + * @param {DataReader} reader + * @param {number} nValues + * @param {Float32Array | Float64Array} output + */ +export function byteStreamSplit(reader, nValues, output) { + const byteWidth = output instanceof Float32Array ? 4 : 8 + const bytes = new Uint8Array(output.buffer) + for (let b = 0; b < byteWidth; b++) { + for (let i = 0; i < nValues; i++) { + bytes[i * byteWidth + b] = reader.view.getUint8(reader.offset++) + } + } +} diff --git a/src/read.js b/src/read.js index 66558f8..3f483fa 100644 --- a/src/read.js +++ b/src/read.js @@ -38,9 +38,8 @@ export async function parquetRead(options) { options.metadata ||= await parquetMetadataAsync(options.file) if (!options.metadata) throw new Error('parquet metadata not found') - const { metadata, onComplete } = options + const { metadata, onComplete, rowEnd } = options const rowStart = options.rowStart || 0 - const rowEnd = options.rowEnd || Number(metadata.num_rows) /** @type {any[][]} */ const rowData = [] @@ -50,13 +49,13 @@ export async function parquetRead(options) { // number of rows in this row group const groupRows = Number(rowGroup.num_rows) // if row group overlaps with row range, read it - if (groupStart + groupRows >= rowStart && groupStart < rowEnd) { + if (groupStart + groupRows >= rowStart && (rowEnd === undefined || groupStart < rowEnd)) { // read row group const groupData = await readRowGroup(options, rowGroup, groupStart) if (onComplete) { // filter to rows in range const start = Math.max(rowStart - groupStart, 0) - const end = Math.min(rowEnd - groupStart, groupRows) + const end = rowEnd === undefined ? undefined : rowEnd - groupStart concat(rowData, groupData.slice(start, end)) } } diff --git a/test/files/repeated_no_annotation.json b/test/files/repeated_no_annotation.json new file mode 100644 index 0000000..828cda2 --- /dev/null +++ b/test/files/repeated_no_annotation.json @@ -0,0 +1,8 @@ +[ + [1, null], + [2, null], + [3, {"phone": []}], + [4, {"phone": [{"number":5555555555,"kind":null}]}], + [5, {"phone": [{"number":1111111111,"kind":"home"}]}], + [6, {"phone": [{"number":1111111111,"kind":"home"},{"number":2222222222,"kind":null},{"number":3333333333,"kind":"mobile"}]}] +] diff --git a/test/files/repeated_no_annotation.metadata.json b/test/files/repeated_no_annotation.metadata.json new file mode 100644 index 0000000..8609af3 --- /dev/null +++ b/test/files/repeated_no_annotation.metadata.json @@ -0,0 +1,107 @@ +{ + "version": 1, + "schema": [ + { + "name": "user", + "num_children": 2 + }, + { + "type": "INT32", + "repetition_type": "REQUIRED", + "name": "id" + }, + { + "repetition_type": "OPTIONAL", + "name": "phoneNumbers", + "num_children": 1 + }, + { + "repetition_type": "REPEATED", + "name": "phone", + "num_children": 2 + }, + { + "type": "INT64", + "repetition_type": "REQUIRED", + "name": "number" + }, + { + "type": "BYTE_ARRAY", + "repetition_type": "OPTIONAL", + "name": "kind", + "converted_type": "UTF8" + } + ], + "num_rows": 0, + "row_groups": [ + { + "columns": [ + { + "file_offset": 64, + "meta_data": { + "type": "INT32", + "encodings": [ + "PLAIN", + "RLE_DICTIONARY" + ], + "path_in_schema": [ + "id" + ], + "codec": "UNCOMPRESSED", + "num_values": 6, + "total_uncompressed_size": 60, + "total_compressed_size": 60, + "data_page_offset": 42, + "dictionary_page_offset": 4 + } + }, + { + "file_offset": 173, + "meta_data": { + "type": "INT64", + "encodings": [ + "PLAIN", + "RLE_DICTIONARY" + ], + "path_in_schema": [ + "phoneNumbers", + "phone", + "number" + ], + "codec": "UNCOMPRESSED", + "num_values": 8, + "total_uncompressed_size": 80, + "total_compressed_size": 80, + "data_page_offset": 139, + "dictionary_page_offset": 93 + } + }, + { + "file_offset": 294, + "meta_data": { + "type": "BYTE_ARRAY", + "encodings": [ + "PLAIN", + "RLE_DICTIONARY" + ], + "path_in_schema": [ + "phoneNumbers", + "phone", + "kind" + ], + "codec": "UNCOMPRESSED", + "num_values": 8, + "total_uncompressed_size": 65, + "total_compressed_size": 65, + "data_page_offset": 261, + "dictionary_page_offset": 229 + } + } + ], + "total_byte_size": 205, + "num_rows": 6 + } + ], + "created_by": "parquet-rs version 0.3.0 (build b45ce7cba2199f22d93269c150d8a83916c69b5e)", + "metadata_length": 306 +} diff --git a/test/files/repeated_no_annotation.parquet b/test/files/repeated_no_annotation.parquet new file mode 100644 index 0000000..02f20a6 Binary files /dev/null and b/test/files/repeated_no_annotation.parquet differ