From 6ffdeca1038d364f72ca53f261c38ba42fc9cc4e Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Sun, 7 Apr 2024 09:33:57 -0700 Subject: [PATCH] Fast array concat --- src/column.js | 9 +++++---- src/encoding.js | 11 ++++++----- src/hyparquet.js | 2 +- src/read.js | 5 +++-- src/{toJson.js => utils.js} | 12 ++++++++++++ test/metadata.test.js | 2 +- test/read.test.js | 2 +- 7 files changed, 29 insertions(+), 14 deletions(-) rename src/{toJson.js => utils.js} (72%) diff --git a/src/column.js b/src/column.js index c589c35..3f9d27f 100644 --- a/src/column.js +++ b/src/column.js @@ -6,6 +6,7 @@ import { readDataPageV2 } from './datapageV2.js' import { parquetHeader } from './header.js' import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, schemaElement } from './schema.js' import { snappyUncompress } from './snappy.js' +import { concat } from './utils.js' /** * @typedef {import('./types.js').SchemaElement} SchemaElement @@ -31,7 +32,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, let valuesSeen = 0 let byteOffset = 0 // byteOffset within the column /** @type {any[]} */ - let rowData = [] + const rowData = [] while (valuesSeen < rowGroup.num_rows) { // parse column header @@ -93,7 +94,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, // values.length !== daph.num_values isn't right. In cases like arrays, // you need the total number of children, not the number of top-level values. - rowData = rowData.concat(values) + concat(rowData, values) } else if (header.type === PageType.DICTIONARY_PAGE) { const diph = header.dictionary_page_header if (!diph) throw new Error('parquet dictionary page header is undefined') @@ -116,7 +117,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, if (repetitionLevels.length) { dereferenceDictionary(dictionary, dataPage) // Use repetition levels to construct lists - rowData = rowData.concat(assembleObjects( + concat(rowData, assembleObjects( definitionLevels, repetitionLevels, dataPage, true, maxDefinitionLevel, maxRepetitionLevel )) } else if (daph2.num_nulls) { @@ -125,7 +126,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata, skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, rowData) } else { dereferenceDictionary(dictionary, dataPage) - rowData = rowData.concat(dataPage) + concat(rowData, dataPage) } // TODO: convert? } else { diff --git a/src/encoding.js b/src/encoding.js index 50a00a4..81243ff 100644 --- a/src/encoding.js +++ b/src/encoding.js @@ -1,4 +1,5 @@ import { readVarInt } from './thrift.js' +import { concat } from './utils.js' /** * Return type with bytes read. @@ -212,14 +213,14 @@ export function widthFromMaxInt(value) { */ export function readData(dataView, encoding, offset, count, bitWidth) { /** @type {any[]} */ - let value = [] + const value = [] let byteLength = 0 if (encoding === 'RLE') { let seen = 0 while (seen < count) { const rle = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, count) if (!rle.value.length) break // EOF - value = value.concat(rle.value) + concat(value, rle.value) seen += rle.value.length byteLength += rle.byteLength } @@ -249,7 +250,7 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue byteLength += 4 } /** @type {number[]} */ - let value = [] + const value = [] const startByteLength = byteLength while (byteLength - startByteLength < length && value.length < numValues) { const [header, newOffset] = readVarInt(dataView, offset + byteLength) @@ -257,14 +258,14 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue if ((header & 1) === 0) { // rle const rle = readRle(dataView, offset + byteLength, header, width) - value = value.concat(rle.value) + concat(value, rle.value) byteLength += rle.byteLength } else { // bit-packed const bitPacked = readBitPacked( dataView, offset + byteLength, header, width, numValues - value.length ) - value = value.concat(bitPacked.value) + concat(value, bitPacked.value) byteLength += bitPacked.byteLength } } diff --git a/src/hyparquet.js b/src/hyparquet.js index b912757..51a441a 100644 --- a/src/hyparquet.js +++ b/src/hyparquet.js @@ -7,5 +7,5 @@ export { parquetRead } import { snappyUncompress } from './snappy.js' export { snappyUncompress } -import { toJson } from './toJson.js' +import { toJson } from './utils.js' export { toJson } diff --git a/src/read.js b/src/read.js index fc6a79e..ff2289c 100644 --- a/src/read.js +++ b/src/read.js @@ -2,6 +2,7 @@ import { getColumnOffset, readColumn } from './column.js' import { parquetMetadataAsync } from './metadata.js' import { getColumnName, isMapLike } from './schema.js' +import { concat } from './utils.js' /** * Read parquet data rows from a file-like object. @@ -40,7 +41,7 @@ export async function parquetRead(options) { const rowStart = options.rowStart || 0 const rowEnd = options.rowEnd || Number(metadata.num_rows) /** @type {any[][]} */ - let rowData = [] + const rowData = [] // find which row groups to read let groupStart = 0 // first row index of the current group @@ -55,7 +56,7 @@ export async function parquetRead(options) { // filter to rows in range const start = Math.max(rowStart - groupStart, 0) const end = Math.min(rowEnd - groupStart, groupRows) - rowData = rowData.concat(groupData.slice(start, end)) + concat(rowData, groupData.slice(start, end)) } } groupStart += groupRows diff --git a/src/toJson.js b/src/utils.js similarity index 72% rename from src/toJson.js rename to src/utils.js index 11c1295..e1160b0 100644 --- a/src/toJson.js +++ b/src/utils.js @@ -21,3 +21,15 @@ export function toJson(obj) { } return obj } + +/** + * Concatenate two arrays fast. + * @param {any[]} aaa first array + * @param {any[]} bbb second array + */ +export function concat(aaa, bbb) { + const chunk = 10000 + for (let i = 0; i < bbb.length; i += chunk) { + aaa.push(...bbb.slice(i, i + chunk)) + } +} diff --git a/test/metadata.test.js b/test/metadata.test.js index 484096d..2bed334 100644 --- a/test/metadata.test.js +++ b/test/metadata.test.js @@ -1,7 +1,7 @@ import fs from 'fs' import { describe, expect, it } from 'vitest' import { parquetMetadata, parquetMetadataAsync } from '../src/hyparquet.js' -import { toJson } from '../src/toJson.js' +import { toJson } from '../src/utils.js' import { fileToAsyncBuffer, fileToJson, readFileToArrayBuffer } from './helpers.js' const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet')) diff --git a/test/read.test.js b/test/read.test.js index f868e41..22446c8 100644 --- a/test/read.test.js +++ b/test/read.test.js @@ -2,7 +2,7 @@ import fs from 'fs' import { describe, expect, it } from 'vitest' import { gunzipSync } from 'zlib' import { parquetRead } from '../src/hyparquet.js' -import { toJson } from '../src/toJson.js' +import { toJson } from '../src/utils.js' import { fileToAsyncBuffer, fileToJson } from './helpers.js' /**