Prepare for alternate decompressors

This commit is contained in:
Kenny Daniel 2024-02-18 16:42:58 -08:00
parent 2e12803974
commit ada7429685
No known key found for this signature in database
GPG Key ID: 90AB653A8CAD7E45
8 changed files with 70 additions and 33 deletions

@ -8,7 +8,6 @@
"eslint:recommended",
"plugin:@typescript-eslint/recommended"
],
"ignorePatterns": ["dist/"],
"plugins": ["import", "jsdoc"],
"rules": {
"@typescript-eslint/no-explicit-any": "warn",
@ -48,7 +47,7 @@
"prefer-destructuring": ["warn", {"object": true, "array": false}],
"prefer-promise-reject-errors": "error",
"quotes": ["error", "single"],
"require-await": "error",
"require-await": "warn",
"semi": ["error", "never"],
"sort-imports": ["error", {
"ignoreDeclarationSort": true,

2
.gitignore vendored

@ -4,4 +4,4 @@ coverage
dist
*.tgz
example.parquet
benchmark.parquet
.vscode

@ -1,6 +1,6 @@
# hyparquet
![hyparquet](hyparquet.jpg)
![hyparquet parakeet](hyparquet.jpg)
[![npm](https://img.shields.io/npm/v/hyparquet)](https://www.npmjs.com/package/hyparquet)
[![workflow status](https://github.com/hyparam/hyparquet/actions/workflows/ci.yml/badge.svg)](https://github.com/hyparam/hyparquet/actions)

@ -4,25 +4,26 @@ import { parquetRead } from './src/hyparquet.js'
const url = 'https://huggingface.co/datasets/wikimedia/wikipedia/resolve/main/20231101.en/train-00000-of-00041.parquet'
// download test parquet file if needed
const stat = await fs.stat('benchmark.parquet').catch(() => undefined)
let stat = await fs.stat('example.parquet').catch(() => undefined)
if (!stat) {
console.log('downloading ' + url)
const res = await fetch(url)
if (!res.ok) throw new Error(res.statusText)
// write to file async
const writeStream = createWriteStream('benchmark.parquet')
const writeStream = createWriteStream('example.parquet')
for await (const chunk of res.body) {
writeStream.write(chunk)
}
// await res.body.pipeTo(writeStream)
console.log('download benchmark.parquet')
writeStream.end()
console.log('downloaded example.parquet')
stat = await fs.stat('example.parquet').catch(() => undefined)
}
// asyncBuffer
const file = {
byteLength: stat.size,
async slice(start, end) {
// read file slice
const readStream = createReadStream('benchmark.parquet', { start, end })
const readStream = createReadStream('example.parquet', { start, end })
const buffer = await readStreamToArrayBuffer(readStream)
return new Uint8Array(buffer).buffer
},

@ -132,7 +132,7 @@ function readPlainByteArray(dataView, offset, count) {
}
/**
* Read `count` fixed length byte array values.
* Read a fixed length byte array.
*
* @param {DataView} dataView - buffer to read data from
* @param {number} offset - offset to start reading from the DataView

@ -43,25 +43,24 @@ function selfCopyBytes(array, pos, offset, length) {
*
* @param {Uint8Array} inputArray compressed data
* @param {Uint8Array} outputArray output buffer
* @returns {boolean} true if successful
* @returns {void}
*/
export function snappyUncompress(inputArray, outputArray) {
const inputLength = inputArray.byteLength
const outputLength = outputArray.byteLength
let pos = 0
let outPos = 0
// skip preamble (contains uncompressed length as varint)
let uncompressedLength = 0
let shift = 0
while (pos < inputLength) {
const c = inputArray[pos]
pos += 1
uncompressedLength |= (c & 0x7f) << shift
if (c < 128) {
break
}
shift += 7
}
if (outputLength && pos >= inputLength) {
throw new Error('invalid snappy length header')
}
while (pos < inputLength) {
@ -69,6 +68,10 @@ export function snappyUncompress(inputArray, outputArray) {
let len = 0
pos += 1
if (pos >= inputLength) {
throw new Error('missing eof marker')
}
// There are two types of elements, literals and copies (back references)
if ((c & 0x3) === 0) {
// Literals are uncompressed data stored directly in the byte stream
@ -76,8 +79,7 @@ export function snappyUncompress(inputArray, outputArray) {
// Longer literal length is encoded in multiple bytes
if (len > 60) {
if (pos + 3 >= inputLength) {
console.warn('snappy error literal pos + 3 >= inputLength')
return false
throw new Error('snappy error literal pos + 3 >= inputLength')
}
const lengthSize = len - 60 // length bytes - 1
len = inputArray[pos]
@ -88,7 +90,7 @@ export function snappyUncompress(inputArray, outputArray) {
pos += lengthSize
}
if (pos + len > inputLength) {
return false // literal exceeds input length
throw new Error('snappy error literal exceeds input length')
}
copyBytes(inputArray, pos, outputArray, outPos, len)
pos += len
@ -106,7 +108,7 @@ export function snappyUncompress(inputArray, outputArray) {
case 2:
// Copy with 2-byte offset
if (inputLength <= pos + 1) {
return false // end of input
throw new Error('snappy error end of input')
}
len = (c >>> 2) + 1
offset = inputArray[pos] + (inputArray[pos + 1] << 8)
@ -115,7 +117,7 @@ export function snappyUncompress(inputArray, outputArray) {
case 3:
// Copy with 4-byte offset
if (inputLength <= pos + 3) {
return false // end of input
throw new Error('snappy error end of input')
}
len = (c >>> 2) + 1
offset = inputArray[pos]
@ -128,14 +130,17 @@ export function snappyUncompress(inputArray, outputArray) {
break
}
if (offset === 0 || isNaN(offset)) {
return false // invalid offset
throw new Error(`invalid offset ${offset} pos ${pos} inputLength ${inputLength}`)
}
if (offset > outPos) {
return false // cannot copy from before start of buffer
throw new Error('cannot copy from before start of buffer')
}
selfCopyBytes(outputArray, outPos, offset, len)
outPos += len
}
}
return true
if (outPos !== outputLength) {
throw new Error('premature end of input')
}
}

@ -8,6 +8,9 @@ describe('package.json', () => {
it('should have a valid version', () => {
expect(packageJson.version).toMatch(/^\d+\.\d+\.\d+$/)
})
it('should have MIT license', () => {
expect(packageJson.license).toBe('MIT')
})
it('should have precise dependency versions', () => {
const { devDependencies } = packageJson
Object.values(devDependencies).forEach(version => {

@ -2,27 +2,56 @@ import { describe, expect, it } from 'vitest'
import { snappyUncompress } from '../src/snappy.js'
describe('snappy uncompress', () => {
it('decompresses valid input correctly', () => {
it('decompresses valid input correctly', async () => {
const testCases = [
{ compressed: new Uint8Array([0x00]), expected: '' },
{ compressed: new Uint8Array([0x01, 0x00, 0x68]), expected: 'h' },
{ compressed: new Uint8Array([0x02, 0x04, 0x68, 0x79]), expected: 'hy' },
{ compressed: new Uint8Array([0x03, 0x08, 0x68, 0x79, 0x70]), expected: 'hyp' },
{ compressed: new Uint8Array([0x05, 0x10, 0x68, 0x79, 0x70, 0x65, 0x72]), expected: 'hyper' },
{ compressed: new Uint8Array([0x0a, 0x24, 0x68, 0x79, 0x70, 0x65, 0x72, 0x70, 0x61, 0x72, 0x61, 0x6d]), expected: 'hyperparam' },
{ compressed: new Uint8Array([0x15, 0x08, 0x68, 0x79, 0x70, 0x46, 0x03, 0x00]), expected: 'hyphyphyphyphyphyphyp' },
{
// from rowgroups.parquet
compressed: new Uint8Array([
80, 4, 1, 0, 9, 1, 0, 2, 9, 7, 4, 0, 3, 13, 8, 0, 4, 13, 8, 0, 5, 13,
8, 0, 6, 13, 8, 0, 7, 13, 8, 0, 8, 13, 8, 60, 9, 0, 0, 0, 0, 0, 0, 0,
10, 0, 0, 0, 0, 0, 0, 0,
]),
expected: new Uint8Array([
1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0,
0, 4, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0,
0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 0,
0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0,
]),
},
]
testCases.forEach(({ compressed, expected }) => {
const futures = testCases.map(async ({ compressed, expected }) => {
const outputArray = new Uint8Array(expected.length)
const result = snappyUncompress(compressed, outputArray)
const outputStr = new TextDecoder().decode(outputArray)
expect(result).toBe(true)
expect(outputStr).toBe(expected)
await snappyUncompress(compressed, outputArray)
if (typeof expected === 'string') {
const outputStr = new TextDecoder().decode(outputArray)
expect(outputStr).toBe(expected)
} else {
expect(outputArray).toEqual(expected) // Uint8Array
}
})
await Promise.all(futures)
})
it('returns false for invalid input', () => {
it('throws for invalid input', () => {
const outputArray = new Uint8Array(10)
expect(snappyUncompress(new Uint8Array([0x03, 0x61]), outputArray)).toBe(false)
expect(snappyUncompress(new Uint8Array([0x03, 0xf1]), outputArray)).toBe(false)
expect(() => snappyUncompress(new Uint8Array([]), outputArray))
.toThrow('invalid snappy length header')
expect(() => snappyUncompress(new Uint8Array([0xff]), outputArray))
.toThrow('invalid snappy length header')
expect(() => snappyUncompress(new Uint8Array([0x03, 0x61]), outputArray))
.toThrow('missing eof marker')
expect(() => snappyUncompress(new Uint8Array([0x03, 0xf1]), outputArray))
.toThrow('missing eof marker')
expect(() => snappyUncompress(new Uint8Array([0x02, 0x00, 0x68]), outputArray))
.toThrow('premature end of input')
})
})