Fast array concat

This commit is contained in:
Kenny Daniel 2024-04-07 09:33:57 -07:00
parent 7529e8a289
commit 6ffdeca103
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
7 changed files with 29 additions and 14 deletions

@ -6,6 +6,7 @@ import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel, isRequired, schemaElement } from './schema.js'
import { snappyUncompress } from './snappy.js'
import { concat } from './utils.js'
/**
* @typedef {import('./types.js').SchemaElement} SchemaElement
@ -31,7 +32,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
let valuesSeen = 0
let byteOffset = 0 // byteOffset within the column
/** @type {any[]} */
let rowData = []
const rowData = []
while (valuesSeen < rowGroup.num_rows) {
// parse column header
@ -93,7 +94,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
// values.length !== daph.num_values isn't right. In cases like arrays,
// you need the total number of children, not the number of top-level values.
rowData = rowData.concat(values)
concat(rowData, values)
} else if (header.type === PageType.DICTIONARY_PAGE) {
const diph = header.dictionary_page_header
if (!diph) throw new Error('parquet dictionary page header is undefined')
@ -116,7 +117,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
if (repetitionLevels.length) {
dereferenceDictionary(dictionary, dataPage)
// Use repetition levels to construct lists
rowData = rowData.concat(assembleObjects(
concat(rowData, assembleObjects(
definitionLevels, repetitionLevels, dataPage, true, maxDefinitionLevel, maxRepetitionLevel
))
} else if (daph2.num_nulls) {
@ -125,7 +126,7 @@ export function readColumn(arrayBuffer, columnOffset, rowGroup, columnMetadata,
skipNulls(definitionLevels, maxDefinitionLevel, dataPage, dictionary, rowData)
} else {
dereferenceDictionary(dictionary, dataPage)
rowData = rowData.concat(dataPage)
concat(rowData, dataPage)
}
// TODO: convert?
} else {

@ -1,4 +1,5 @@
import { readVarInt } from './thrift.js'
import { concat } from './utils.js'
/**
* Return type with bytes read.
@ -212,14 +213,14 @@ export function widthFromMaxInt(value) {
*/
export function readData(dataView, encoding, offset, count, bitWidth) {
/** @type {any[]} */
let value = []
const value = []
let byteLength = 0
if (encoding === 'RLE') {
let seen = 0
while (seen < count) {
const rle = readRleBitPackedHybrid(dataView, offset + byteLength, bitWidth, 0, count)
if (!rle.value.length) break // EOF
value = value.concat(rle.value)
concat(value, rle.value)
seen += rle.value.length
byteLength += rle.byteLength
}
@ -249,7 +250,7 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue
byteLength += 4
}
/** @type {number[]} */
let value = []
const value = []
const startByteLength = byteLength
while (byteLength - startByteLength < length && value.length < numValues) {
const [header, newOffset] = readVarInt(dataView, offset + byteLength)
@ -257,14 +258,14 @@ export function readRleBitPackedHybrid(dataView, offset, width, length, numValue
if ((header & 1) === 0) {
// rle
const rle = readRle(dataView, offset + byteLength, header, width)
value = value.concat(rle.value)
concat(value, rle.value)
byteLength += rle.byteLength
} else {
// bit-packed
const bitPacked = readBitPacked(
dataView, offset + byteLength, header, width, numValues - value.length
)
value = value.concat(bitPacked.value)
concat(value, bitPacked.value)
byteLength += bitPacked.byteLength
}
}

@ -7,5 +7,5 @@ export { parquetRead }
import { snappyUncompress } from './snappy.js'
export { snappyUncompress }
import { toJson } from './toJson.js'
import { toJson } from './utils.js'
export { toJson }

@ -2,6 +2,7 @@
import { getColumnOffset, readColumn } from './column.js'
import { parquetMetadataAsync } from './metadata.js'
import { getColumnName, isMapLike } from './schema.js'
import { concat } from './utils.js'
/**
* Read parquet data rows from a file-like object.
@ -40,7 +41,7 @@ export async function parquetRead(options) {
const rowStart = options.rowStart || 0
const rowEnd = options.rowEnd || Number(metadata.num_rows)
/** @type {any[][]} */
let rowData = []
const rowData = []
// find which row groups to read
let groupStart = 0 // first row index of the current group
@ -55,7 +56,7 @@ export async function parquetRead(options) {
// filter to rows in range
const start = Math.max(rowStart - groupStart, 0)
const end = Math.min(rowEnd - groupStart, groupRows)
rowData = rowData.concat(groupData.slice(start, end))
concat(rowData, groupData.slice(start, end))
}
}
groupStart += groupRows

@ -21,3 +21,15 @@ export function toJson(obj) {
}
return obj
}
/**
* Concatenate two arrays fast.
* @param {any[]} aaa first array
* @param {any[]} bbb second array
*/
export function concat(aaa, bbb) {
const chunk = 10000
for (let i = 0; i < bbb.length; i += chunk) {
aaa.push(...bbb.slice(i, i + chunk))
}
}

@ -1,7 +1,7 @@
import fs from 'fs'
import { describe, expect, it } from 'vitest'
import { parquetMetadata, parquetMetadataAsync } from '../src/hyparquet.js'
import { toJson } from '../src/toJson.js'
import { toJson } from '../src/utils.js'
import { fileToAsyncBuffer, fileToJson, readFileToArrayBuffer } from './helpers.js'
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))

@ -2,7 +2,7 @@ import fs from 'fs'
import { describe, expect, it } from 'vitest'
import { gunzipSync } from 'zlib'
import { parquetRead } from '../src/hyparquet.js'
import { toJson } from '../src/toJson.js'
import { toJson } from '../src/utils.js'
import { fileToAsyncBuffer, fileToJson } from './helpers.js'
/**