build types before publishing to npm (#46)

* build types before publishing to npm

* use prepare instead of prepublishOnly + make it clear that we only build types

doc for prepare vs prepublishOnly is here: https://docs.npmjs.com/cli/v8/using-npm/scripts

* no jsx in this lib

* relative imports from the root, so that it works from types/

* remove unused hyparquet.d.ts + report differences to jsdoc in files

* try to understand if this is the cause of the failing CI check

tsc fails: https://github.com/hyparam/hyparquet/actions/runs/12040954822/job/33571851170?pr=46

* Revert "try to understand if this is the cause of the failing CI check"

This reverts commit 5e2fc8ca179064369de71793ab1cda3facefddc7.

* not sure what happens, but we just need to ensure the types are created correctly

* increment version

* Explicitly export types for use in downstream typescript projects

* Use new typescript jsdoc imports for smaller package

* Combine some files and use @import jsdoc

* use the local typescript

---------

Co-authored-by: Kenny Daniel <platypii@gmail.com>
This commit is contained in:
Sylvain Lesage 2024-12-02 17:47:42 +01:00 committed by GitHub
parent 3dabd14ae0
commit 09ae9400c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 358 additions and 472 deletions

1
.gitignore vendored

@ -5,3 +5,4 @@ coverage
.vscode
.DS_Store
/*.parquet
types

@ -15,13 +15,16 @@
},
"main": "src/hyparquet.js",
"files": [
"types",
"src"
],
"type": "module",
"types": "src/hyparquet.d.ts",
"types": "types/hyparquet.d.ts",
"scripts": {
"build:types": "tsc -p ./tsconfig.build.json",
"coverage": "vitest run --coverage --coverage.include=src",
"lint": "eslint .",
"prepare": "npm run build:types",
"test": "vitest run"
},
"devDependencies": {

@ -6,8 +6,7 @@ import { isListLike, isMapLike } from './schema.js'
* Reconstructs a complex nested structure from flat arrays of definition and repetition levels,
* according to Dremel encoding.
*
* @typedef {import('./types.d.ts').DecodedArray} DecodedArray
* @typedef {import('./types.d.ts').FieldRepetitionType} FieldRepetitionType
* @import {DecodedArray, FieldRepetitionType} from '../src/types.d.ts'
* @param {any[]} output
* @param {number[] | undefined} definitionLevels
* @param {number[]} repetitionLevels
@ -104,7 +103,7 @@ export function assembleLists(
* Assemble a nested structure from subcolumn data.
* https://github.com/apache/parquet-format/blob/apache-parquet-format-2.10.0/LogicalTypes.md#nested-types
*
* @typedef {import('./types.d.ts').SchemaTree} SchemaTree
* @import {SchemaTree} from '../src/types.d.ts'
* @param {Map<string, any[]>} subcolumnData
* @param {SchemaTree} schema top-level schema element
* @param {number} [depth] depth of nested structure

@ -1,54 +0,0 @@
/**
* Returns a cached layer on top of an AsyncBuffer. For caching slices of a file
* that are read multiple times, possibly over a network.
*
* @typedef {import('./types.js').AsyncBuffer} AsyncBuffer
* @param {AsyncBuffer} file file-like object to cache
* @returns {AsyncBuffer} cached file-like object
*/
export function cachedAsyncBuffer({ byteLength, slice }) {
const cache = new Map()
return {
byteLength,
/**
* @param {number} start
* @param {number} [end]
* @returns {import('./types.js').Awaitable<ArrayBuffer>}
*/
slice(start, end) {
const key = cacheKey(start, end, byteLength)
const cached = cache.get(key)
if (cached) return cached
// cache miss, read from file
const promise = slice(start, end)
cache.set(key, promise)
return promise
},
}
}
/**
* Returns canonical cache key for a byte range 'start,end'.
* Normalize int-range and suffix-range requests to the same key.
*
* @param {number} start start byte of range
* @param {number} [end] end byte of range, or undefined for suffix range
* @param {number} [size] size of file, or undefined for suffix range
* @returns {string}
*/
function cacheKey(start, end, size) {
if (start < 0) {
if (end !== undefined) throw new Error(`invalid suffix range [${start}, ${end}]`)
if (size === undefined) return `${start},`
return `${size + start},${size}`
} else if (end !== undefined) {
if (start > end) throw new Error(`invalid empty range [${start}, ${end}]`)
return `${start},${end}`
} else if (size === undefined) {
return `${start},`
} else {
return `${start},${size}`
}
}

@ -1,21 +1,19 @@
import { assembleLists } from './assemble.js'
import { Encoding, PageType } from './constants.js'
import { convertWithDictionary } from './convert.js'
import { decompressPage, readDataPage, readDictionaryPage } from './datapage.js'
import { readDataPageV2 } from './datapageV2.js'
import { parquetHeader } from './header.js'
import { decompressPage, readDataPage, readDataPageV2, readDictionaryPage } from './datapage.js'
import { getMaxDefinitionLevel } from './schema.js'
import { deserializeTCompactProtocol } from './thrift.js'
import { concat } from './utils.js'
/**
* Parse column data from a buffer.
*
* @typedef {import('./types.js').ColumnMetaData} ColumnMetaData
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {import('./types.js').DataReader} reader
* @param {DataReader} reader
* @param {number} rowLimit maximum number of rows to read
* @param {ColumnMetaData} columnMetadata column metadata
* @param {import('./types.js').SchemaTree[]} schemaPath schema path for the column
* @param {import('./hyparquet.js').ParquetReadOptions} options read options
* @param {SchemaTree[]} schemaPath schema path for the column
* @param {ParquetReadOptions} options read options
* @returns {any[]} array of values
*/
export function readColumn(reader, rowLimit, columnMetadata, schemaPath, { compressors, utf8 }) {
@ -117,3 +115,61 @@ export function getColumnRange({ dictionary_page_offset, data_page_offset, total
}
return [columnOffset, columnOffset + total_compressed_size]
}
/**
* Read parquet header from a buffer.
*
* @import {ColumnMetaData, DecodedArray, DataReader, PageHeader, ParquetReadOptions, SchemaTree} from '../src/types.d.ts'
* @param {DataReader} reader - parquet file reader
* @returns {PageHeader} metadata object and bytes read
*/
function parquetHeader(reader) {
const header = deserializeTCompactProtocol(reader)
// Parse parquet header from thrift data
const type = PageType[header.field_1]
const uncompressed_page_size = header.field_2
const compressed_page_size = header.field_3
const crc = header.field_4
const data_page_header = header.field_5 && {
num_values: header.field_5.field_1,
encoding: Encoding[header.field_5.field_2],
definition_level_encoding: Encoding[header.field_5.field_3],
repetition_level_encoding: Encoding[header.field_5.field_4],
statistics: header.field_5.field_5 && {
max: header.field_5.field_5.field_1,
min: header.field_5.field_5.field_2,
null_count: header.field_5.field_5.field_3,
distinct_count: header.field_5.field_5.field_4,
max_value: header.field_5.field_5.field_5,
min_value: header.field_5.field_5.field_6,
},
}
const index_page_header = header.field_6
const dictionary_page_header = header.field_7 && {
num_values: header.field_7.field_1,
encoding: Encoding[header.field_7.field_2],
is_sorted: header.field_7.field_3,
}
const data_page_header_v2 = header.field_8 && {
num_values: header.field_8.field_1,
num_nulls: header.field_8.field_2,
num_rows: header.field_8.field_3,
encoding: Encoding[header.field_8.field_4],
definition_levels_byte_length: header.field_8.field_5,
repetition_levels_byte_length: header.field_8.field_6,
is_compressed: header.field_8.field_7 === undefined ? true : header.field_8.field_7, // default true
statistics: header.field_8.field_8,
}
return {
type,
uncompressed_page_size,
compressed_page_size,
crc,
data_page_header,
index_page_header,
dictionary_page_header,
data_page_header_v2,
}
}

@ -1,4 +1,4 @@
/** @type {import('./types.js').ParquetType[]} */
/** @type {import('../src/types.d.ts').ParquetType[]} */
export const ParquetType = [
'BOOLEAN',
'INT32',
@ -29,7 +29,7 @@ export const FieldRepetitionType = [
'REPEATED',
]
/** @type {import('./types.js').ConvertedType[]} */
/** @type {import('../src/types.d.ts').ConvertedType[]} */
export const ConvertedType = [
'UTF8',
'MAP',
@ -55,7 +55,7 @@ export const ConvertedType = [
'INTERVAL',
]
/** @type {import('./types.js').LogicalTypeType[]} */
/** @type {import('../src/types.d.ts').LogicalTypeType[]} */
export const logicalTypeType = [
'NULL',
'STRING',
@ -85,7 +85,7 @@ export const CompressionCodec = [
'LZ4_RAW',
]
/** @type {import('./types.js').PageType[]} */
/** @type {import('../src/types.d.ts').PageType[]} */
export const PageType = [
'DATA_PAGE',
'INDEX_PAGE',
@ -93,7 +93,7 @@ export const PageType = [
'DATA_PAGE_V2',
]
/** @type {import('./types.js').BoundaryOrder[]} */
/** @type {import('../src/types.d.ts').BoundaryOrder[]} */
export const BoundaryOrder = [
'UNORDERED',
'ASCENDING',

@ -3,12 +3,11 @@ const dayMillis = 86400000 // 1 day in milliseconds
/**
* Convert known types from primitive to rich, and dereference dictionary.
*
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @typedef {import('./types.js').SchemaElement} SchemaElement
* @import {DecodedArray, Encoding, SchemaElement} from '../src/types.d.ts'
* @param {DecodedArray} data series of primitive types
* @param {DecodedArray | undefined} dictionary
* @param {SchemaElement} schemaElement
* @param {import('./types.js').Encoding} encoding
* @param {Encoding} encoding
* @param {boolean | undefined} utf8 decode bytes as utf8?
* @returns {DecodedArray} series of rich types
*/

@ -1,3 +1,4 @@
import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js'
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
@ -6,11 +7,6 @@ import { snappyUncompress } from './snappy.js'
/**
* Read a data page from uncompressed reader.
*
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").DataPageHeader} DataPageHeader
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @param {Uint8Array} bytes raw page data (should already be decompressed)
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath
@ -58,7 +54,7 @@ export function readDataPage(bytes, daph, schemaPath, { type }) {
/**
* @param {Uint8Array} bytes raw page data
* @param {import("./types.d.ts").DictionaryPageHeader} diph dictionary page header
* @param {DictionaryPageHeader} diph dictionary page header
* @param {ColumnMetaData} columnMetadata
* @param {number | undefined} typeLength - type_length from schema
* @returns {DecodedArray}
@ -70,7 +66,7 @@ export function readDictionaryPage(bytes, diph, columnMetadata, typeLength) {
}
/**
* @typedef {import("./types.d.ts").DataReader} DataReader
* @import {ColumnMetaData, CompressionCodec, Compressors, DataPage, DataPageHeader, DataPageHeaderV2, DataReader, DecodedArray, DictionaryPageHeader, PageHeader, SchemaTree} from '../src/types.d.ts'
* @param {DataReader} reader data view for the page
* @param {DataPageHeader} daph data page header
* @param {SchemaTree[]} schemaPath
@ -114,8 +110,8 @@ function readDefinitionLevels(reader, daph, schemaPath) {
/**
* @param {Uint8Array} compressedBytes
* @param {number} uncompressed_page_size
* @param {import('./types.js').CompressionCodec} codec
* @param {import('./types.js').Compressors | undefined} compressors
* @param {CompressionCodec} codec
* @param {Compressors | undefined} compressors
* @returns {Uint8Array}
*/
export function decompressPage(compressedBytes, uncompressed_page_size, codec, compressors) {
@ -137,3 +133,110 @@ export function decompressPage(compressedBytes, uncompressed_page_size, codec, c
}
return page
}
/**
* Read a data page from the given Uint8Array.
*
* @param {Uint8Array} compressedBytes raw page data
* @param {PageHeader} ph page header
* @param {SchemaTree[]} schemaPath
* @param {ColumnMetaData} columnMetadata
* @param {Compressors | undefined} compressors
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) {
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
const reader = { view, offset: 0 }
const { codec, type } = columnMetadata
const daph2 = ph.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
// repetition levels
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath)
reader.offset = daph2.repetition_levels_byte_length // readVarInt() => len for boolean v2?
// definition levels
const definitionLevels = readDefinitionLevelsV2(reader, daph2, schemaPath)
// assert(reader.offset === daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length)
const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length
let page = compressedBytes.subarray(reader.offset)
if (daph2.is_compressed !== false) {
page = decompressPage(page, uncompressedPageSize, codec, compressors)
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const pageReader = { view: pageView, offset: 0 }
// read values based on encoding
/** @type {DecodedArray} */
let dataPage
const nValues = daph2.num_values - daph2.num_nulls
if (daph2.encoding === 'PLAIN') {
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = readPlain(pageReader, type, nValues, type_length)
} else if (daph2.encoding === 'RLE') {
// assert(columnMetadata.type === 'BOOLEAN')
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, 1, 0, dataPage)
dataPage = dataPage.map(x => !!x)
} else if (
daph2.encoding === 'PLAIN_DICTIONARY' ||
daph2.encoding === 'RLE_DICTIONARY'
) {
const bitWidth = pageView.getUint8(pageReader.offset++)
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize - 1, dataPage)
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
const int32 = type === 'INT32'
dataPage = int32 ? new Int32Array(nValues) : new BigInt64Array(nValues)
deltaBinaryUnpack(pageReader, nValues, dataPage)
} else if (daph2.encoding === 'DELTA_LENGTH_BYTE_ARRAY') {
dataPage = new Array(nValues)
deltaLengthByteArray(pageReader, nValues, dataPage)
} else if (daph2.encoding === 'DELTA_BYTE_ARRAY') {
dataPage = new Array(nValues)
deltaByteArray(pageReader, nValues, dataPage)
} else if (daph2.encoding === 'BYTE_STREAM_SPLIT') {
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = byteStreamSplit(reader, nValues, type, type_length)
} else {
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
}
return { definitionLevels, repetitionLevels, dataPage }
}
/**
* @param {DataReader} reader
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {SchemaTree[]} schemaPath
* @returns {any[]} repetition levels
*/
function readRepetitionLevelsV2(reader, daph2, schemaPath) {
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (!maxRepetitionLevel) return []
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(
reader, bitWidth(maxRepetitionLevel), daph2.repetition_levels_byte_length, values
)
return values
}
/**
* @param {DataReader} reader
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {SchemaTree[]} schemaPath
* @returns {number[] | undefined} definition levels
*/
function readDefinitionLevelsV2(reader, daph2, schemaPath) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
if (maxDefinitionLevel) {
// V2 we know the length
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), daph2.definition_levels_byte_length, values)
return values
}
}

@ -1,117 +0,0 @@
import { decompressPage } from './datapage.js'
import { deltaBinaryUnpack, deltaByteArray, deltaLengthByteArray } from './delta.js'
import { bitWidth, byteStreamSplit, readRleBitPackedHybrid } from './encoding.js'
import { readPlain } from './plain.js'
import { getMaxDefinitionLevel, getMaxRepetitionLevel } from './schema.js'
/**
* Read a data page from the given Uint8Array.
*
* @typedef {import("./types.d.ts").DataPage} DataPage
* @typedef {import("./types.d.ts").ColumnMetaData} ColumnMetaData
* @typedef {import("./types.d.ts").Compressors} Compressors
* @typedef {import("./types.d.ts").DataPageHeaderV2} DataPageHeaderV2
* @typedef {import("./types.d.ts").SchemaTree} SchemaTree
* @param {Uint8Array} compressedBytes raw page data
* @param {import("./types.d.ts").PageHeader} ph page header
* @param {SchemaTree[]} schemaPath
* @param {ColumnMetaData} columnMetadata
* @param {Compressors | undefined} compressors
* @returns {DataPage} definition levels, repetition levels, and array of values
*/
export function readDataPageV2(compressedBytes, ph, schemaPath, columnMetadata, compressors) {
const view = new DataView(compressedBytes.buffer, compressedBytes.byteOffset, compressedBytes.byteLength)
const reader = { view, offset: 0 }
const { codec, type } = columnMetadata
const daph2 = ph.data_page_header_v2
if (!daph2) throw new Error('parquet data page header v2 is undefined')
// repetition levels
const repetitionLevels = readRepetitionLevelsV2(reader, daph2, schemaPath)
reader.offset = daph2.repetition_levels_byte_length // readVarInt() => len for boolean v2?
// definition levels
const definitionLevels = readDefinitionLevelsV2(reader, daph2, schemaPath)
// assert(reader.offset === daph2.repetition_levels_byte_length + daph2.definition_levels_byte_length)
const uncompressedPageSize = ph.uncompressed_page_size - daph2.definition_levels_byte_length - daph2.repetition_levels_byte_length
let page = compressedBytes.subarray(reader.offset)
if (daph2.is_compressed !== false) {
page = decompressPage(page, uncompressedPageSize, codec, compressors)
}
const pageView = new DataView(page.buffer, page.byteOffset, page.byteLength)
const pageReader = { view: pageView, offset: 0 }
// read values based on encoding
/** @type {import('./types.d.ts').DecodedArray} */
let dataPage
const nValues = daph2.num_values - daph2.num_nulls
if (daph2.encoding === 'PLAIN') {
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = readPlain(pageReader, type, nValues, type_length)
} else if (daph2.encoding === 'RLE') {
// assert(columnMetadata.type === 'BOOLEAN')
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, 1, 0, dataPage)
dataPage = dataPage.map(x => !!x)
} else if (
daph2.encoding === 'PLAIN_DICTIONARY' ||
daph2.encoding === 'RLE_DICTIONARY'
) {
const bitWidth = pageView.getUint8(pageReader.offset++)
dataPage = new Array(nValues)
readRleBitPackedHybrid(pageReader, bitWidth, uncompressedPageSize - 1, dataPage)
} else if (daph2.encoding === 'DELTA_BINARY_PACKED') {
const int32 = type === 'INT32'
dataPage = int32 ? new Int32Array(nValues) : new BigInt64Array(nValues)
deltaBinaryUnpack(pageReader, nValues, dataPage)
} else if (daph2.encoding === 'DELTA_LENGTH_BYTE_ARRAY') {
dataPage = new Array(nValues)
deltaLengthByteArray(pageReader, nValues, dataPage)
} else if (daph2.encoding === 'DELTA_BYTE_ARRAY') {
dataPage = new Array(nValues)
deltaByteArray(pageReader, nValues, dataPage)
} else if (daph2.encoding === 'BYTE_STREAM_SPLIT') {
const { type_length } = schemaPath[schemaPath.length - 1].element
dataPage = byteStreamSplit(reader, nValues, type, type_length)
} else {
throw new Error(`parquet unsupported encoding: ${daph2.encoding}`)
}
return { definitionLevels, repetitionLevels, dataPage }
}
/**
* @typedef {import("./types.d.ts").DataReader} DataReader
* @param {DataReader} reader
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {SchemaTree[]} schemaPath
* @returns {any[]} repetition levels
*/
export function readRepetitionLevelsV2(reader, daph2, schemaPath) {
const maxRepetitionLevel = getMaxRepetitionLevel(schemaPath)
if (!maxRepetitionLevel) return []
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(
reader, bitWidth(maxRepetitionLevel), daph2.repetition_levels_byte_length, values
)
return values
}
/**
* @param {DataReader} reader
* @param {DataPageHeaderV2} daph2 data page header v2
* @param {SchemaTree[]} schemaPath
* @returns {number[] | undefined} definition levels
*/
function readDefinitionLevelsV2(reader, daph2, schemaPath) {
const maxDefinitionLevel = getMaxDefinitionLevel(schemaPath)
if (maxDefinitionLevel) {
// V2 we know the length
const values = new Array(daph2.num_values)
readRleBitPackedHybrid(reader, bitWidth(maxDefinitionLevel), daph2.definition_levels_byte_length, values)
return values
}
}

@ -1,7 +1,7 @@
import { readVarInt, readZigZagBigInt } from './thrift.js'
/**
* @typedef {import('./types.d.ts').DataReader} DataReader
* @import {DataReader} from '../src/types.d.ts'
* @param {DataReader} reader
* @param {number} count number of values to read
* @param {Int32Array | BigInt64Array} output

@ -15,8 +15,6 @@ export function bitWidth(value) {
*
* If length is zero, then read int32 length at the start.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @param {DataReader} reader
* @param {number} width - width of each bit-packed group
* @param {number} length - length of the encoded data
@ -117,7 +115,6 @@ function readBitPacked(reader, header, bitWidth, output, seen) {
}
/**
* @typedef {import("./types.d.ts").ParquetType} ParquetType
* @param {DataReader} reader
* @param {number} count
* @param {ParquetType} type
@ -149,6 +146,7 @@ export function byteStreamSplit(reader, count, type, typeLength) {
}
/**
* @import {DataReader, DecodedArray, ParquetType} from '../src/types.d.ts'
* @param {ParquetType} type
* @param {number | undefined} typeLength
* @returns {number}

@ -1,61 +0,0 @@
import { Encoding, PageType } from './constants.js'
import { deserializeTCompactProtocol } from './thrift.js'
/**
* Read parquet header from a buffer.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @typedef {import("./types.d.ts").PageHeader} PageHeader
* @param {DataReader} reader - parquet file reader
* @returns {PageHeader} metadata object and bytes read
*/
export function parquetHeader(reader) {
const header = deserializeTCompactProtocol(reader)
// Parse parquet header from thrift data
const type = PageType[header.field_1]
const uncompressed_page_size = header.field_2
const compressed_page_size = header.field_3
const crc = header.field_4
const data_page_header = header.field_5 && {
num_values: header.field_5.field_1,
encoding: Encoding[header.field_5.field_2],
definition_level_encoding: Encoding[header.field_5.field_3],
repetition_level_encoding: Encoding[header.field_5.field_4],
statistics: header.field_5.field_5 && {
max: header.field_5.field_5.field_1,
min: header.field_5.field_5.field_2,
null_count: header.field_5.field_5.field_3,
distinct_count: header.field_5.field_5.field_4,
max_value: header.field_5.field_5.field_5,
min_value: header.field_5.field_5.field_6,
},
}
const index_page_header = header.field_6
const dictionary_page_header = header.field_7 && {
num_values: header.field_7.field_1,
encoding: Encoding[header.field_7.field_2],
is_sorted: header.field_7.field_3,
}
const data_page_header_v2 = header.field_8 && {
num_values: header.field_8.field_1,
num_nulls: header.field_8.field_2,
num_rows: header.field_8.field_3,
encoding: Encoding[header.field_8.field_4],
definition_levels_byte_length: header.field_8.field_5,
repetition_levels_byte_length: header.field_8.field_6,
is_compressed: header.field_8.field_7 === undefined ? true : header.field_8.field_7, // default true
statistics: header.field_8.field_8,
}
return {
type,
uncompressed_page_size,
compressed_page_size,
crc,
data_page_header,
index_page_header,
dictionary_page_header,
data_page_header_v2,
}
}

164
src/hyparquet.d.ts vendored

@ -1,164 +0,0 @@
import type { AsyncBuffer, CompressionCodec, Compressors, ConvertedType, FileMetaData, LogicalType, ParquetType, SchemaTree } from './types.d.ts'
export type { AsyncBuffer, CompressionCodec, Compressors, ConvertedType, FileMetaData, LogicalType, ParquetType, SchemaTree }
/**
* Read parquet data rows from a file-like object.
* Reads the minimal number of row groups and columns to satisfy the request.
*
* Returns a void promise when complete, and to throw errors.
* Data is returned in onComplete, not the return promise, because
* if onComplete is undefined, we parse the data, and emit chunks, but skip
* computing the row view directly. This saves on allocation if the caller
* wants to cache the full chunks, and make their own view of the data from
* the chunks.
*
* @param {object} options read options
* @param {AsyncBuffer} options.file file-like object containing parquet data
* @param {FileMetaData} [options.metadata] parquet file metadata
* @param {string[]} [options.columns] columns to read, all columns if undefined
* @param {string} [options.rowFormat] desired format of each row passed to the onComplete function
* @param {number} [options.rowStart] first requested row index (inclusive)
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {Function} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
* @param {Function} [options.onComplete] called when all requested rows and columns are parsed
* @param {Compressors} [options.compressor] custom decompressors
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
*/
export function parquetRead(options: ParquetReadOptions): Promise<void>
/**
* Read parquet data and return a Promise of object-oriented row data.
*
* @param {object} options read options
* @param {AsyncBuffer} options.file file-like object containing parquet data
* @param {FileMetaData} [options.metadata] parquet file metadata
* @param {string[]} [options.columns] columns to read, all columns if undefined
* @param {number} [options.rowStart] first requested row index (inclusive)
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {Compressors} [options.compressor] custom decompressors
* @returns {Promise<void>} resolves when all requested rows and columns are parsed
*/
export function parquetReadObjects(options: ParquetReadOptions): Promise<Array<Record<string, any>>>
/**
* Wraps parquetRead with orderBy support.
* This is a parquet-aware query engine that can read a subset of rows and columns.
* Accepts an optional orderBy column name to sort the results.
* Note that using orderBy may SIGNIFICANTLY increase the query time.
*
* @param {ParquetReadOptions & { orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export function parquetQuery(options: ParquetReadOptions & { orderBy?: string }): Promise<Array<Record<string, any>>>
/**
* Read parquet metadata from an async buffer.
*
* An AsyncBuffer is like an ArrayBuffer, but the slices are loaded
* asynchronously, possibly over the network.
*
* You must provide the byteLength of the buffer, typically from a HEAD request.
*
* In theory, you could use suffix-range requests to fetch the end of the file,
* and save a round trip. But in practice, this doesn't work because chrome
* deems suffix-range requests as a not-safe-listed header, and will require
* a pre-flight. So the byteLength is required.
*
* To make this efficient, we initially request the last 512kb of the file,
* which is likely to contain the metadata. If the metadata length exceeds the
* initial fetch, 512kb, we request the rest of the metadata from the AsyncBuffer.
*
* This ensures that we either make one 512kb initial request for the metadata,
* or a second request for up to the metadata size.
*
* @param {AsyncBuffer} asyncBuffer parquet file contents
* @param {number} initialFetchSize initial fetch size in bytes (default 512kb)
* @returns {Promise<FileMetaData>} parquet metadata object
*/
export function parquetMetadataAsync(asyncBuffer: AsyncBuffer, initialFetchSize?: number): Promise<FileMetaData>
/**
* Read parquet metadata from a buffer
*
* @param {ArrayBuffer} arrayBuffer parquet file contents
* @returns {FileMetaData} parquet metadata object
*/
export function parquetMetadata(arrayBuffer: ArrayBuffer): FileMetaData
/**
* Return a tree of schema elements from parquet metadata.
*
* @param {FileMetaData} metadata parquet metadata object
* @returns {SchemaTree} tree of schema elements
*/
export function parquetSchema(metadata: FileMetaData): SchemaTree
/**
* Decompress snappy data.
* Accepts an output buffer to avoid allocating a new buffer for each call.
*
* @param {Uint8Array} input compressed data
* @param {Uint8Array} output output buffer
* @returns {boolean} true if successful
*/
export function snappyUncompress(input: Uint8Array, output: Uint8Array): boolean
/**
* Replace bigints with numbers.
* When parsing parquet files, bigints are used to represent 64-bit integers.
* However, JSON does not support bigints, so it's helpful to convert to numbers.
*
* @param {any} obj object to convert
* @returns {unknown} converted object
*/
export function toJson(obj: any): any
/**
* Construct an AsyncBuffer for a URL.
* If byteLength is not provided, will make a HEAD request to get the file size.
* If requestInit is provided, it will be passed to fetch.
*/
export function asyncBufferFromUrl({url, byteLength, requestInit}: {url: string, byteLength?: number, requestInit?: RequestInit}): Promise<AsyncBuffer>
/**
* Construct an AsyncBuffer for a local file using node fs package.
*/
export function asyncBufferFromFile(filename: string): Promise<AsyncBuffer>
/**
* Get the byte length of a URL using a HEAD request.
* If requestInit is provided, it will be passed to fetch.
*/
export function byteLengthFromUrl(url: string, requestInit?: RequestInit): Promise<number>
/**
* Returns a cached layer on top of an AsyncBuffer.
*/
export function cachedAsyncBuffer(asyncBuffer: AsyncBuffer): AsyncBuffer
/**
* Parquet query options for reading data
*/
export interface ParquetReadOptions {
file: AsyncBuffer // file-like object containing parquet data
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
columns?: string[] // columns to read, all columns if undefined
rowFormat?: string // format of each row passed to the onComplete function
rowStart?: number // inclusive
rowEnd?: number // exclusive
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
}
/**
* A run of column data
*/
export interface ColumnData {
columnName: string
columnData: ArrayLike<any>
rowStart: number
rowEnd: number
}

@ -7,14 +7,12 @@ export { parquetQuery } from './query.js'
export { snappyUncompress } from './snappy.js'
export { asyncBufferFromFile, asyncBufferFromUrl, byteLengthFromUrl, toJson } from './utils.js'
export { cachedAsyncBuffer } from './asyncBuffer.js'
export { asyncBufferFromFile, asyncBufferFromUrl, byteLengthFromUrl, cachedAsyncBuffer, toJson } from './utils.js'
/**
* @param {import('./hyparquet.js').ParquetReadOptions} options
* @returns {Promise<Array<Record<string, any>>>}
*/
* @param {ParquetReadOptions} options
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export function parquetReadObjects(options) {
return new Promise((onComplete, reject) => {
parquetRead({
@ -24,3 +22,41 @@ export function parquetReadObjects(options) {
}).catch(reject)
})
}
/**
* Explicitly export types for use in downstream typescript projects through
* `import { ParquetReadOptions } from 'hyparquet'` for example.
*
* @template {any} T
* @typedef {import('../src/types.d.ts').Awaitable<T>} Awaitable<T>
*/
/**
* @typedef {import('../src/types.d.ts').AsyncBuffer} AsyncBuffer
* @typedef {import('../src/types.d.ts').DataReader} DataReader
* @typedef {import('../src/types.d.ts').FileMetaData} FileMetaData
* @typedef {import('../src/types.d.ts').SchemaTree} SchemaTree
* @typedef {import('../src/types.d.ts').SchemaElement} SchemaElement
* @typedef {import('../src/types.d.ts').ParquetType} ParquetType
* @typedef {import('../src/types.d.ts').FieldRepetitionType} FieldRepetitionType
* @typedef {import('../src/types.d.ts').ConvertedType} ConvertedType
* @typedef {import('../src/types.d.ts').TimeUnit} TimeUnit
* @typedef {import('../src/types.d.ts').LogicalType} LogicalType
* @typedef {import('../src/types.d.ts').LogicalTypeType} LogicalTypeType
* @typedef {import('../src/types.d.ts').RowGroup} RowGroup
* @typedef {import('../src/types.d.ts').ColumnChunk} ColumnChunk
* @typedef {import('../src/types.d.ts').ColumnMetaData} ColumnMetaData
* @typedef {import('../src/types.d.ts').Encoding} Encoding
* @typedef {import('../src/types.d.ts').CompressionCodec} CompressionCodec
* @typedef {import('../src/types.d.ts').Compressors} Compressors
* @typedef {import('../src/types.d.ts').Statistics} Statistics
* @typedef {import('../src/types.d.ts').PageType} PageType
* @typedef {import('../src/types.d.ts').PageHeader} PageHeader
* @typedef {import('../src/types.d.ts').DataPageHeader} DataPageHeader
* @typedef {import('../src/types.d.ts').DictionaryPageHeader} DictionaryPageHeader
* @typedef {import('../src/types.d.ts').DecodedArray} DecodedArray
* @typedef {import('../src/types.d.ts').OffsetIndex} OffsetIndex
* @typedef {import('../src/types.d.ts').ColumnIndex} ColumnIndex
* @typedef {import('../src/types.d.ts').BoundaryOrder} BoundaryOrder
* @typedef {import('../src/types.d.ts').ColumnData} ColumnData
* @typedef {import('../src/types.d.ts').ParquetReadOptions} ParquetReadOptions
*/

@ -3,10 +3,9 @@ import { convertMetadata } from './metadata.js'
import { deserializeTCompactProtocol } from './thrift.js'
/**
* @typedef {import('./types.d.ts').DataReader} DataReader
* @param {DataReader} reader
* @param {import('./types.d.ts').SchemaElement} schema
* @returns {import('./types.d.ts').ColumnIndex}
* @param {SchemaElement} schema
* @returns {ColumnIndex}
*/
export function readColumnIndex(reader, schema) {
const thrift = deserializeTCompactProtocol(reader)
@ -23,7 +22,7 @@ export function readColumnIndex(reader, schema) {
/**
* @param {DataReader} reader
* @returns {import('./types.d.ts').OffsetIndex}
* @returns {OffsetIndex}
*/
export function readOffsetIndex(reader) {
const thrift = deserializeTCompactProtocol(reader)
@ -34,8 +33,9 @@ export function readOffsetIndex(reader) {
}
/**
* @import {ColumnIndex, DataReader, OffsetIndex, PageLocation, SchemaElement} from '../src/types.d.ts'
* @param {any} loc
* @returns {import('./types.d.ts').PageLocation}
* @returns {PageLocation}
*/
function pageLocation(loc) {
return {

@ -23,11 +23,8 @@ import { deserializeTCompactProtocol } from './thrift.js'
* This ensures that we either make one 512kb initial request for the metadata,
* or a second request for up to the metadata size.
*
* @typedef {import("./types.d.ts").AsyncBuffer} AsyncBuffer
* @typedef {import("./types.d.ts").FileMetaData} FileMetaData
* @typedef {import("./types.d.ts").SchemaElement} SchemaElement
* @param {AsyncBuffer} asyncBuffer parquet file contents
* @param {number} initialFetchSize initial fetch size in bytes
* @param {number} initialFetchSize initial fetch size in bytes (default 512kb)
* @returns {Promise<FileMetaData>} parquet metadata object
*/
export async function parquetMetadataAsync(asyncBuffer, initialFetchSize = 1 << 19 /* 512kb */) {
@ -190,7 +187,7 @@ export function parquetMetadata(arrayBuffer) {
* Return a tree of schema elements from parquet metadata.
*
* @param {FileMetaData} metadata parquet metadata object
* @returns {import("./types.d.ts").SchemaTree} tree of schema elements
* @returns {SchemaTree} tree of schema elements
*/
export function parquetSchema(metadata) {
return getSchemaPath(metadata.schema, [])[0]
@ -198,7 +195,7 @@ export function parquetSchema(metadata) {
/**
* @param {any} logicalType
* @returns {import("./types.d.ts").LogicalType | undefined}
* @returns {LogicalType | undefined}
*/
function logicalType(logicalType) {
if (logicalType?.field_1) return { type: 'STRING' }
@ -236,7 +233,7 @@ function logicalType(logicalType) {
/**
* @param {any} unit
* @returns {import("./types.d.ts").TimeUnit}
* @returns {TimeUnit}
*/
function timeUnit(unit) {
if (unit.field_1) return 'MILLIS'
@ -248,9 +245,10 @@ function timeUnit(unit) {
/**
* Convert column statistics based on column type.
*
* @import {AsyncBuffer, FileMetaData, LogicalType, MinMaxType, SchemaElement, SchemaTree, Statistics, TimeUnit} from '../src/types.d.ts'
* @param {any} stats
* @param {SchemaElement} schema
* @returns {import("./types.d.ts").Statistics}
* @returns {Statistics}
*/
function convertStats(stats, schema) {
return stats && {
@ -268,7 +266,7 @@ function convertStats(stats, schema) {
/**
* @param {Uint8Array | undefined} value
* @param {SchemaElement} schema
* @returns {import('./types.d.ts').MinMaxType | undefined}
* @returns {MinMaxType | undefined}
*/
export function convertMetadata(value, schema) {
const { type, converted_type, logical_type } = schema

@ -1,9 +1,7 @@
/**
* Read `count` values of the given type from the reader.view.
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @typedef {import("./types.d.ts").DecodedArray} DecodedArray
* @typedef {import("./types.d.ts").ParquetType} ParquetType
* @import {DataReader, DecodedArray, ParquetType} from '../src/types.d.ts'
* @param {DataReader} reader - buffer to read data from
* @param {ParquetType} type - parquet type of the data
* @param {number} count - number of values to read

@ -3,12 +3,12 @@ import { parquetMetadataAsync } from './metadata.js'
/**
* Wraps parquetRead with orderBy support.
* This is a parquet-aware query engine that can read a subset of rows,
* with an optional orderBy clause.
* This is a parquet-aware query engine that can read a subset of rows and columns.
* Accepts an optional orderBy column name to sort the results.
* Note that using orderBy may SIGNIFICANTLY increase the query time.
*
* @typedef {import('./hyparquet.js').ParquetReadOptions} ParquetReadOptions
* @param {ParquetReadOptions & { orderBy?: string }} options
* @returns {Promise<Record<string, any>[]>}
* @returns {Promise<Record<string, any>[]>} resolves when all requested rows and columns are parsed
*/
export async function parquetQuery(options) {
const { file, rowStart, rowEnd, orderBy } = options
@ -36,6 +36,7 @@ export async function parquetQuery(options) {
/**
* Reads a list rows from a parquet file, reading only the row groups that contain the rows.
* Returns a sparse array of rows.
* @import {ParquetReadOptions} from '../src/types.d.ts'
* @param {ParquetReadOptions & { rows: number[] }} options
* @returns {Promise<Record<string, any>[]>}
*/

@ -15,15 +15,11 @@ import { concat } from './utils.js'
* wants to cache the full chunks, and make their own view of the data from
* the chunks.
*
* @typedef {import('./hyparquet.js').ColumnData} ColumnData
* @typedef {import('./types.js').Compressors} Compressors
* @typedef {import('./types.js').AsyncBuffer} AsyncBuffer
* @typedef {import('./types.js').FileMetaData} FileMetaData
* @param {object} options read options
* @param {AsyncBuffer} options.file file-like object containing parquet data
* @param {FileMetaData} [options.metadata] parquet file metadata
* @param {string[]} [options.columns] columns to read, all columns if undefined
* @param {string} [options.rowFormat] format of each row passed to the onComplete function
* @param {string} [options.rowFormat] desired format of each row passed to the onComplete function
* @param {number} [options.rowStart] first requested row index (inclusive)
* @param {number} [options.rowEnd] last requested row index (exclusive)
* @param {(chunk: ColumnData) => void} [options.onChunk] called when a column chunk is parsed. chunks may include row data outside the requested range.
@ -69,7 +65,6 @@ export async function parquetRead(options) {
/**
* Read a row group from a file-like object.
*
* @typedef {import('./types.js').RowGroup} RowGroup
* @param {object} options read options
* @param {AsyncBuffer} options.file file-like object containing parquet data
* @param {FileMetaData} [options.metadata] parquet file metadata
@ -217,7 +212,8 @@ export async function readRowGroup(options, rowGroup, groupStart, rowLimit) {
/**
* Return a list of sub-columns needed to construct a top-level column.
*
* @param {import('./types.js').SchemaTree} schema
* @import {AsyncBuffer, ColumnData, Compressors, FileMetaData, RowGroup, SchemaTree} from '../src/types.d.ts'
* @param {SchemaTree} schema
* @param {string[]} output
* @returns {string[]}
*/

@ -1,8 +1,7 @@
/**
* Build a tree from the schema elements.
*
* @typedef {import('./types.js').SchemaElement} SchemaElement
* @typedef {import('./types.js').SchemaTree} SchemaTree
* @import {SchemaElement, SchemaTree} from '../src/types.d.ts'
* @param {SchemaElement[]} schema
* @param {number} rootIndex index of the root element
* @param {string[]} path path to the element

@ -19,7 +19,7 @@ const CompactType = {
/**
* Parse TCompactProtocol
*
* @typedef {import("./types.d.ts").DataReader} DataReader
* @import {DataReader} from '../src/types.d.ts'
* @param {DataReader} reader
* @returns {Record<string, any>}
*/

26
src/types.d.ts vendored

@ -324,3 +324,29 @@ export interface ColumnIndex {
}
export type BoundaryOrder = 'UNORDERED' | 'ASCENDING' | 'DESCENDING'
/**
* A run of column data
*/
export interface ColumnData {
columnName: string
columnData: ArrayLike<any>
rowStart: number
rowEnd: number
}
/**
* Parquet query options for reading data
*/
export interface ParquetReadOptions {
file: AsyncBuffer // file-like object containing parquet data
metadata?: FileMetaData // parquet metadata, will be parsed if not provided
columns?: string[] // columns to read, all columns if undefined
rowFormat?: string // format of each row passed to the onComplete function
rowStart?: number // first requested row index (inclusive)
rowEnd?: number // last requested row index (exclusive)
onChunk?: (chunk: ColumnData) => void // called when a column chunk is parsed. chunks may be outside the requested range.
onComplete?: (rows: any[][]) => void // called when all requested rows and columns are parsed
compressors?: Compressors // custom decompressors
utf8?: boolean // decode byte arrays as utf8 strings (default true)
}

@ -1,5 +1,7 @@
/**
* Replace bigint, date, etc with legal JSON types.
* When parsing parquet files, bigints are used to represent 64-bit integers.
* However, JSON does not support bigints, so it's helpful to convert to numbers.
*
* @param {any} obj object to convert
* @returns {unknown} converted object
@ -25,7 +27,6 @@ export function toJson(obj) {
/**
* Concatenate two arrays fast.
*
* @typedef {import('./types.js').DecodedArray} DecodedArray
* @param {any[]} aaa first array
* @param {DecodedArray} bbb second array
*/
@ -38,6 +39,7 @@ export function concat(aaa, bbb) {
/**
* Get the byte length of a URL using a HEAD request.
* If requestInit is provided, it will be passed to fetch.
*
* @param {string} url
* @param {RequestInit} [requestInit] fetch options
@ -55,8 +57,9 @@ export async function byteLengthFromUrl(url, requestInit) {
/**
* Construct an AsyncBuffer for a URL.
* If byteLength is not provided, will make a HEAD request to get the file size.
* If requestInit is provided, it will be passed to fetch.
*
* @typedef {import('./types.js').AsyncBuffer} AsyncBuffer
* @param {object} options
* @param {string} options.url
* @param {number} [options.byteLength]
@ -119,3 +122,57 @@ function readStreamToArrayBuffer(input) {
input.on('error', reject)
})
}
/**
* Returns a cached layer on top of an AsyncBuffer. For caching slices of a file
* that are read multiple times, possibly over a network.
*
* @param {AsyncBuffer} file file-like object to cache
* @returns {AsyncBuffer} cached file-like object
*/
export function cachedAsyncBuffer({ byteLength, slice }) {
const cache = new Map()
return {
byteLength,
/**
* @param {number} start
* @param {number} [end]
* @returns {Awaitable<ArrayBuffer>}
*/
slice(start, end) {
const key = cacheKey(start, end, byteLength)
const cached = cache.get(key)
if (cached) return cached
// cache miss, read from file
const promise = slice(start, end)
cache.set(key, promise)
return promise
},
}
}
/**
* Returns canonical cache key for a byte range 'start,end'.
* Normalize int-range and suffix-range requests to the same key.
*
* @import {AsyncBuffer, Awaitable, DecodedArray} from '../src/types.d.ts'
* @param {number} start start byte of range
* @param {number} [end] end byte of range, or undefined for suffix range
* @param {number} [size] size of file, or undefined for suffix range
* @returns {string}
*/
function cacheKey(start, end, size) {
if (start < 0) {
if (end !== undefined) throw new Error(`invalid suffix range [${start}, ${end}]`)
if (size === undefined) return `${start},`
return `${size + start},${size}`
} else if (end !== undefined) {
if (start > end) throw new Error(`invalid empty range [${start}, ${end}]`)
return `${start},${end}`
} else if (size === undefined) {
return `${start},`
} else {
return `${start},${size}`
}
}

@ -1,5 +1,5 @@
import { describe, expect, it, vi } from 'vitest'
import { cachedAsyncBuffer } from '../src/asyncBuffer.js'
import { cachedAsyncBuffer } from '../src/utils.js'
describe('cachedAsyncBuffer', () => {
it('caches slices of a file to avoid multiple reads', async () => {

@ -2,7 +2,7 @@ import { describe, expect, it } from 'vitest'
import { convert, parseFloat16 } from '../src/convert.js'
/**
* @typedef {import('../src/types.js').SchemaElement} SchemaElement
* @import {SchemaElement} from '../src/types.js'
*/
describe('convert function', () => {

@ -14,8 +14,9 @@ export function fileToJson(filePath) {
/**
* Make a DataReader from bytes
*
* @import {DataReader} from '../src/types.d.ts'
* @param {number[]} bytes
* @returns {import('../src/types.js').DataReader}
* @returns {DataReader}
*/
export function reader(bytes) {
return { view: new DataView(new Uint8Array(bytes).buffer), offset: 0 }

@ -9,7 +9,7 @@ import {
describe('Parquet schema utils', () => {
/**
* @typedef {import('../src/types.js').SchemaElement} SchemaElement
* @import {SchemaElement} from '../src/types.js'
* @type {SchemaElement[]}
*/
const schema = [

11
tsconfig.build.json Normal file

@ -0,0 +1,11 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"noEmit": false,
"declaration": true,
"emitDeclarationOnly": true,
"outDir": "types",
"declarationMap": true
},
"include": ["src"]
}