mirror of
https://github.com/asadbek064/hyparquet-compressors.git
synced 2026-01-12 05:36:37 +00:00
Fix hadoop lz4
This commit is contained in:
parent
c0cb3d8159
commit
3f956e2862
@ -1,5 +1,9 @@
|
||||
# hyparquet decompressors
|
||||
|
||||
[](https://github.com/hyparam/hyparquet-compressors/actions)
|
||||
[](https://opensource.org/licenses/MIT)
|
||||

|
||||
|
||||
This package exports a `compressors` object intended to be passed into [hyparquet](https://github.com/hyparam/hyparquet).
|
||||
|
||||
[Apache Parquet](https://parquet.apache.org) is a popular columnar storage format that is widely used in data engineering, data science, and machine learning applications for efficiently storing and processing large datasets. It supports a number of different compression formats, but most parquet files use snappy compression.
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { snappyUncompressor } from 'hysnappy'
|
||||
import pako from 'pako'
|
||||
import { LZ4 } from './lz4.js'
|
||||
import { LZ4, LZ4_RAW } from './lz4.js'
|
||||
|
||||
/**
|
||||
* @type {import('hyparquet').Compressors}
|
||||
@ -11,5 +11,5 @@ export const compressors = {
|
||||
BROTLI: () => new Uint8Array(), // TODO
|
||||
ZSTD: () => new Uint8Array(), // TODO
|
||||
LZ4,
|
||||
LZ4_RAW: LZ4,
|
||||
LZ4_RAW,
|
||||
}
|
||||
|
||||
67
src/lz4.js
67
src/lz4.js
@ -1,5 +1,6 @@
|
||||
/**
|
||||
* LZ4 decompression
|
||||
* LZ4 decompression with legacy hadoop support.
|
||||
* https://github.com/apache/arrow/blob/apache-arrow-16.1.0/cpp/src/arrow/util/compression_lz4.cc#L475
|
||||
*
|
||||
* @param {Uint8Array} input
|
||||
* @param {number} outputLength
|
||||
@ -7,13 +8,55 @@
|
||||
*/
|
||||
export function LZ4(input, outputLength) {
|
||||
const output = new Uint8Array(outputLength)
|
||||
let len = 0 // output position
|
||||
try {
|
||||
let i = 0 // input index
|
||||
let o = 0 // output index
|
||||
while (i < input.length - 8) {
|
||||
const expectedOutputLength = input[i++] << 24 | input[i++] << 16 | input[i++] << 8 | input[i++]
|
||||
const expectedInputLength = input[i++] << 24 | input[i++] << 16 | input[i++] << 8 | input[i++]
|
||||
if (input.length - i < expectedInputLength) throw new Error('lz4 not hadoop')
|
||||
if (output.length < expectedOutputLength) throw new Error('lz4 not hadoop')
|
||||
|
||||
// decompress and compare with expected
|
||||
const chunk = lz4basic(input.subarray(i, i + expectedInputLength), output, o)
|
||||
if (chunk !== expectedOutputLength) throw new Error('lz4 not hadoop')
|
||||
i += expectedInputLength
|
||||
o += expectedOutputLength
|
||||
|
||||
if (i === input.length) return output
|
||||
}
|
||||
if (i < input.length) throw new Error('lz4 not hadoop')
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message !== 'lz4 not hadoop') throw error
|
||||
// fallback to basic lz4
|
||||
lz4basic(input, output, 0)
|
||||
}
|
||||
return output
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic LZ4 block decompression.
|
||||
*
|
||||
* @param {Uint8Array} input
|
||||
* @param {number} outputLength
|
||||
* @returns {Uint8Array}
|
||||
*/
|
||||
export function LZ4_RAW(input, outputLength) {
|
||||
const output = new Uint8Array(outputLength)
|
||||
lz4basic(input, output, 0)
|
||||
return output
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Uint8Array} input
|
||||
* @param {Uint8Array} output
|
||||
* @param {number} outputIndex
|
||||
* @returns {number} bytes written
|
||||
*/
|
||||
function lz4basic(input, output, outputIndex) {
|
||||
let len = outputIndex // output position
|
||||
for (let i = 0; i < input.length;) {
|
||||
const token = input[i++]
|
||||
if (!token) {
|
||||
i += 7 // leading length
|
||||
continue
|
||||
}
|
||||
|
||||
let literals = token >> 4
|
||||
if (literals) {
|
||||
@ -24,20 +67,22 @@ export function LZ4(input, outputLength) {
|
||||
output.set(input.subarray(i, i + literals), len)
|
||||
len += literals
|
||||
i += literals
|
||||
if (i >= input.length) return output
|
||||
if (i >= input.length) return len - outputIndex
|
||||
}
|
||||
|
||||
const offset = input[i++] | input[i++] << 8
|
||||
if (!offset || offset > len) throw new Error(`lz4 offset out of range ${offset}`)
|
||||
if (!offset || offset > len) {
|
||||
throw new Error(`lz4 offset out of range ${offset}`)
|
||||
}
|
||||
// match length
|
||||
let matchLength = (token & 0xf) + 4
|
||||
let matchLength = (token & 0xf) + 4 // minmatch 4
|
||||
let byte = matchLength + 240
|
||||
while (byte === 255) matchLength += byte = input[i++]
|
||||
// copy match
|
||||
// TODO: fast path when no overlap
|
||||
let pos = len - offset
|
||||
const end = len + matchLength
|
||||
while (len < end) output[len++] = output[pos++]
|
||||
}
|
||||
|
||||
return output
|
||||
return len - outputIndex
|
||||
}
|
||||
|
||||
10002
test/files/hadoop_lz4_compressed_larger.json
Normal file
10002
test/files/hadoop_lz4_compressed_larger.json
Normal file
File diff suppressed because it is too large
Load Diff
BIN
test/files/hadoop_lz4_compressed_larger.parquet
Normal file
BIN
test/files/hadoop_lz4_compressed_larger.parquet
Normal file
Binary file not shown.
BIN
test/files/lz4_raw_compressed.parquet
Normal file
BIN
test/files/lz4_raw_compressed.parquet
Normal file
Binary file not shown.
@ -4,10 +4,10 @@ import { describe, expect, it } from 'vitest'
|
||||
import { compressors } from '../src/index.js'
|
||||
|
||||
describe('lz4 compressor', () => {
|
||||
it('should read lz4 compressed parquet file hadoop_lz4_compressed', async () => {
|
||||
it('read lz4 compressed parquet file hadoop_lz4_compressed', async () => {
|
||||
const buffer = fs.readFileSync('test/files/hadoop_lz4_compressed.parquet')
|
||||
const file = new Uint8Array(buffer).buffer
|
||||
const expected = fs.readFileSync('test/files/non_hadoop_lz4_compressed.json').toString()
|
||||
const expected = fs.readFileSync('test/files/lz4_compressed.json').toString()
|
||||
|
||||
await parquetRead({ file, compressors, onComplete: data => {
|
||||
expect(data.length).toBe(4)
|
||||
@ -15,10 +15,32 @@ describe('lz4 compressor', () => {
|
||||
} })
|
||||
})
|
||||
|
||||
it('should read lz4 compressed parquet file non_hadoop_lz4_compressed', async () => {
|
||||
it('read lz4 compressed parquet file hadoop_lz4_compressed_larger', async () => {
|
||||
const buffer = fs.readFileSync('test/files/hadoop_lz4_compressed_larger.parquet')
|
||||
const file = new Uint8Array(buffer).buffer
|
||||
const expected = fs.readFileSync('test/files/hadoop_lz4_compressed_larger.json').toString()
|
||||
|
||||
await parquetRead({ file, compressors, onComplete: data => {
|
||||
expect(data.length).toBe(10000)
|
||||
expect(toJson(data)).toEqual(JSON.parse(expected))
|
||||
} })
|
||||
})
|
||||
|
||||
it('read lz4 compressed parquet file lz4_raw_compressed', async () => {
|
||||
const buffer = fs.readFileSync('test/files/lz4_raw_compressed.parquet')
|
||||
const file = new Uint8Array(buffer).buffer
|
||||
const expected = fs.readFileSync('test/files/lz4_compressed.json').toString()
|
||||
|
||||
await parquetRead({ file, compressors, onComplete: data => {
|
||||
expect(data.length).toBe(4)
|
||||
expect(toJson(data)).toEqual(JSON.parse(expected))
|
||||
} })
|
||||
})
|
||||
|
||||
it('read lz4 compressed parquet file non_hadoop_lz4_compressed', async () => {
|
||||
const buffer = fs.readFileSync('test/files/non_hadoop_lz4_compressed.parquet')
|
||||
const file = new Uint8Array(buffer).buffer
|
||||
const expected = fs.readFileSync('test/files/non_hadoop_lz4_compressed.json').toString()
|
||||
const expected = fs.readFileSync('test/files/lz4_compressed.json').toString()
|
||||
|
||||
await parquetRead({ file, compressors, onComplete: data => {
|
||||
expect(data.length).toBe(4)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user