utils: asyncBufferFromFile

This commit is contained in:
Kenny Daniel 2024-07-26 15:01:01 -07:00
parent 5188b3c764
commit a5122e61d6
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
7 changed files with 59 additions and 69 deletions

@ -1,7 +1,8 @@
import { createReadStream, createWriteStream, promises as fs } from 'fs'
import { createWriteStream, promises as fs } from 'fs'
import { compressors } from 'hyparquet-compressors'
import { pipeline } from 'stream/promises'
import { parquetRead } from './src/hyparquet.js'
import { asyncBufferFromFile } from './src/utils.js'
const url = 'https://huggingface.co/datasets/wikimedia/wikipedia/resolve/main/20231101.en/train-00000-of-00041.parquet'
const filename = 'example.parquet'
@ -19,14 +20,7 @@ if (!stat) {
}
// asyncBuffer
const file = {
byteLength: stat.size,
async slice(start, end) {
// read file slice
const readStream = createReadStream(filename, { start, end })
return await readStreamToArrayBuffer(readStream)
},
}
const file = await asyncBufferFromFile(filename)
const startTime = performance.now()
console.log('parsing example.parquet data...')
@ -37,21 +31,3 @@ await parquetRead({
})
const ms = performance.now() - startTime
console.log(`parsed ${stat.size.toLocaleString()} bytes in ${ms.toFixed(0)} ms`)
/**
* Convert a web ReadableStream to ArrayBuffer.
*
* @param {ReadStream} input
* @returns {Promise<ArrayBuffer>}
*/
function readStreamToArrayBuffer(input) {
return new Promise((resolve, reject) => {
const chunks = []
input.on('data', chunk => chunks.push(chunk))
input.on('end', () => {
const buffer = Buffer.concat(chunks)
resolve(buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength))
})
input.on('error', reject)
})
}

@ -65,3 +65,41 @@ export async function asyncBufferFromUrl(url) {
},
}
}
/**
* Construct an AsyncBuffer for a local file using node fs package.
*
* @param {string} filename
* @returns {Promise<AsyncBuffer>}
*/
export async function asyncBufferFromFile(filename) {
const fs = await import('fs')
const stat = await fs.promises.stat(filename)
return {
byteLength: stat.size,
async slice(start, end) {
// read file slice
const readStream = fs.createReadStream(filename, { start, end })
return await readStreamToArrayBuffer(readStream)
},
}
}
/**
* Convert a node ReadStream to ArrayBuffer.
*
* @param {import('stream').Readable} input
* @returns {Promise<ArrayBuffer>}
*/
function readStreamToArrayBuffer(input) {
return new Promise((resolve, reject) => {
/** @type {Buffer[]} */
const chunks = []
input.on('data', chunk => chunks.push(chunk))
input.on('end', () => {
const buffer = Buffer.concat(chunks)
resolve(buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength))
})
input.on('error', reject)
})
}

@ -1,29 +1,5 @@
import fs from 'fs'
/**
* Helper function to read .parquet file into ArrayBuffer
*
* @param {string} filePath
* @returns {Promise<ArrayBuffer>}
*/
export async function readFileToArrayBuffer(filePath) {
const buffer = await fs.promises.readFile(filePath)
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)
}
/**
* Wrap .parquet file in an AsyncBuffer
*
* @param {string} filePath
* @returns {import('../src/types.js').AsyncBuffer}
*/
export function fileToAsyncBuffer(filePath) {
return {
byteLength: fs.statSync(filePath).size,
slice: async (start, end) => (await readFileToArrayBuffer(filePath)).slice(start, end),
}
}
/**
* Read .parquet file into JSON
*

@ -1,15 +1,16 @@
import fs from 'fs'
import { describe, expect, it } from 'vitest'
import { parquetMetadata, parquetMetadataAsync } from '../src/hyparquet.js'
import { toJson } from '../src/utils.js'
import { fileToAsyncBuffer, fileToJson, readFileToArrayBuffer } from './helpers.js'
import { asyncBufferFromFile, toJson } from '../src/utils.js'
import { fileToJson } from './helpers.js'
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
describe('parquetMetadata', () => {
files.forEach(file => {
it(`parse metadata from ${file}`, async () => {
const arrayBuffer = await readFileToArrayBuffer(`test/files/${file}`)
const asyncBuffer = await asyncBufferFromFile(`test/files/${file}`)
const arrayBuffer = await asyncBuffer.slice(0)
const result = toJson(parquetMetadata(arrayBuffer))
const base = file.replace('.parquet', '')
const expected = fileToJson(`test/files/${base}.metadata.json`)
@ -53,7 +54,7 @@ describe('parquetMetadata', () => {
describe('parquetMetadataAsync', () => {
files.forEach(file => {
it(`parse metadata async from ${file}`, async () => {
const asyncBuffer = fileToAsyncBuffer(`test/files/${file}`)
const asyncBuffer = await asyncBufferFromFile(`test/files/${file}`)
const result = await parquetMetadataAsync(asyncBuffer)
const base = file.replace('.parquet', '')
const expected = fileToJson(`test/files/${base}.metadata.json`)

@ -1,7 +1,6 @@
import { describe, expect, it } from 'vitest'
import { parquetRead } from '../src/hyparquet.js'
import { toJson } from '../src/utils.js'
import { fileToAsyncBuffer } from './helpers.js'
import { asyncBufferFromFile, toJson } from '../src/utils.js'
describe('parquetRead', () => {
it('throws error for undefined file', async () => {
@ -18,7 +17,7 @@ describe('parquetRead', () => {
})
it('filter by row', async () => {
const file = fileToAsyncBuffer('test/files/rowgroups.parquet')
const file = await asyncBufferFromFile('test/files/rowgroups.parquet')
await parquetRead({
file,
rowStart: 2,
@ -30,7 +29,7 @@ describe('parquetRead', () => {
})
it('filter by row overestimate', async () => {
const file = fileToAsyncBuffer('test/files/rowgroups.parquet')
const file = await asyncBufferFromFile('test/files/rowgroups.parquet')
await parquetRead({
file,
rowEnd: 100,
@ -41,7 +40,7 @@ describe('parquetRead', () => {
})
it('read a single column', async () => {
const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
await parquetRead({
file,
columns: ['c'],
@ -66,7 +65,7 @@ describe('parquetRead', () => {
})
it('read a list-like column', async () => {
const file = fileToAsyncBuffer('test/files/datapage_v2.snappy.parquet')
const file = await asyncBufferFromFile('test/files/datapage_v2.snappy.parquet')
await parquetRead({
file,
columns: ['e'],
@ -91,7 +90,7 @@ describe('parquetRead', () => {
})
it('read a map-like column', async () => {
const file = fileToAsyncBuffer('test/files/nullable.impala.parquet')
const file = await asyncBufferFromFile('test/files/nullable.impala.parquet')
await parquetRead({
file,
columns: ['int_map'],

@ -2,15 +2,15 @@ import fs from 'fs'
import { compressors } from 'hyparquet-compressors'
import { describe, expect, it } from 'vitest'
import { parquetRead } from '../src/hyparquet.js'
import { toJson } from '../src/utils.js'
import { fileToAsyncBuffer, fileToJson } from './helpers.js'
import { asyncBufferFromFile, toJson } from '../src/utils.js'
import { fileToJson } from './helpers.js'
describe('parquetRead test files', () => {
const files = fs.readdirSync('test/files').filter(f => f.endsWith('.parquet'))
files.forEach(filename => {
it(`parse data from ${filename}`, async () => {
const file = fileToAsyncBuffer(`test/files/${filename}`)
const file = await asyncBufferFromFile(`test/files/${filename}`)
await parquetRead({
file,
compressors,

@ -1,11 +1,11 @@
import { describe, expect, it } from 'vitest'
import { parquetMetadata, parquetSchema } from '../src/hyparquet.js'
import { readFileToArrayBuffer } from './helpers.js'
import { parquetMetadataAsync, parquetSchema } from '../src/hyparquet.js'
import { asyncBufferFromFile } from '../src/utils.js'
describe('parquetSchema', () => {
it('parse schema tree from rowgroups.parquet', async () => {
const arrayBuffer = await readFileToArrayBuffer('test/files/rowgroups.parquet')
const metadata = parquetMetadata(arrayBuffer)
const arrayBuffer = await asyncBufferFromFile('test/files/rowgroups.parquet')
const metadata = await parquetMetadataAsync(arrayBuffer)
const result = parquetSchema(metadata)
expect(result).toEqual(rowgroupsSchema)
})