Skip to content

Commit bd9767c

Browse files
committed
WIP testing Buffer issue
1 parent 58ca110 commit bd9767c

File tree

9 files changed

+59
-39
lines changed

9 files changed

+59
-39
lines changed

.tool-versions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
nodejs 18.18.2
1+
nodejs 18.20.4

lib/codec/encoding.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export function bitWidth(value: number): number {
5353
* @param {number} width - width of each bit-packed group
5454
* @param {number} length - length of the encoded data, in bytes (?)
5555
* @param {DecodedArray} output
56+
* @param {disableEnvelope} - set to true to consume entire buffer, false to assume (and therefore skip) a 4 byte header
5657
*/
5758
export function readRleBitPackedHybrid(reader: DataReader, width: number, length: number, output: DecodedArray, disableEnvelope?: boolean) {
5859

@@ -121,7 +122,7 @@ export function readBitPacked(reader: DataReader,
121122
seen: number): number {
122123
let count = header >> 1 << 3 // values to read
123124
const mask = (1 << bitWidth) - 1
124-
// when reading definition levels v2 on readColumnChunk, ArrayBuffer len is 69 only
125+
125126
let data = 0
126127
if (reader.offset < reader.view.byteLength) {
127128
data = reader.view.getUint8(reader.offset++)

lib/codec/plain_dictionary.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import {readRleBitPackedHybrid} from "./encoding";
55
export const decodeValues = function (type: string, cursor: Cursor, count: number, opts: Options) {
66
const bitWidth = cursor.buffer.subarray(cursor.offset, cursor.offset + 1).readInt8(0);
77
cursor.offset += 1;
8+
// old:
9+
// return rle.decodeValues(type, cursor, count, Object.assign({}, opts, { disableEnvelope: true, bitWidth }));
810
const reader: DataReader = {
911
view: new DataView(cursor.buffer.buffer, cursor.offset),
1012
offset: 0,
1113
}
1214
let output: DecodedArray = new Array(count);
13-
readRleBitPackedHybrid(reader, bitWidth, count, output, true)
14-
cursor.offset += reader.offset
15+
const disableEnvelope = true;
16+
readRleBitPackedHybrid(reader, bitWidth, count, output, disableEnvelope)
1517
return output;
1618
};

lib/codec/rle.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// https://github.com/apache/parquet-format/blob/master/Encodings.md
44

55
import varint from 'varint';
6-
import { Cursor } from './types';
6+
import {Cursor} from './types';
77
import {readBitPacked, readRle, readRleBitPackedHybrid, readVarInt} from "./encoding";
88

99
function encodeRunBitpacked(values: number[], opts: { bitWidth: number }) {
@@ -111,6 +111,10 @@ export const encodeValues = function (
111111
// opts.bitWidth is undefined when the boolean values are being passed
112112
// decode a bitpacked value
113113
// setting old code to true here only results in the RLE/bitpacked hybrid test failing, so we know that code is bad.
114+
// cursor: Cursor containing the data to be decoded
115+
// count: the number of values expected to result from the decoding
116+
// opts: bitWidth is required.
117+
// returns: a DecodedArray
114118
export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWidth: number }): Array<number> {
115119
const run_old_code = true;
116120
let output = new Array(count).fill(0);
@@ -127,8 +131,7 @@ export function decodeRunBitpacked(cursor: Cursor, count: number, opts: { bitWid
127131

128132
cursor.offset += opts.bitWidth * (count / 8);
129133
} else {
130-
let arrayBuf = cursor.buffer.buffer.slice(0, count);
131-
const view = new DataView(arrayBuf, cursor.offset, count);
134+
const view = new DataView(cursor.buffer.buffer, cursor.offset);
132135
const reader = {view, offset: 0}
133136
const header = readVarInt(reader);
134137
readBitPacked(reader, header, opts.bitWidth, output, 0)

lib/datapageV2.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export const readDefinitionLevelsV2 = (reader: DataReader,
4848
// V2 we know the length
4949
const values = new Array(daph2.num_values)
5050
const bitWidth = getBitWidth(dLevelMax)
51-
let disableEnvelope = daph2.definition_levels_byte_length === 0
51+
const disableEnvelope = true
5252
readRleBitPackedHybrid(reader, bitWidth, daph2.definition_levels_byte_length, values, disableEnvelope)
5353
return values
5454
}

lib/reader.ts

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s
2828
import type { Readable } from 'stream';
2929
import type { Blob } from 'buffer';
3030
import {readDefinitionLevelsV2, readRepetitionLevelsV2} from "./datapageV2";
31+
import {dataReaderFromCursor} from "./util";
3132

3233
const { getBloomFiltersFor } = bloomFilterReader;
3334

@@ -96,7 +97,6 @@ class ParquetCursor {
9697
this.columnList
9798
);
9899

99-
// now this one is *@($&ing up, it's dematerializing records
100100
this.rowGroup = parquet_shredder.materializeRecords(this.schema, rowBuffer);
101101
this.rowGroupIndex++;
102102
this.cursorIndex = 0;
@@ -933,7 +933,7 @@ async function decodePages(buffer: Buffer, opts: Options) {
933933
pageData.values = pageData.values!.map((d) => opts.dictionary![d]);
934934
}
935935

936-
const length = pageData.rlevels !== undefined ? pageData.dlevels?.length : 0;
936+
const length = pageData.rlevels !== undefined ? pageData.rlevels?.length : 0;
937937

938938
if (pageData.rlevels?.length) {
939939
data.rlevels = pageData.rlevels;
@@ -1062,12 +1062,6 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader,
10621062
};
10631063
}
10641064

1065-
// ensures minimum allocation of ArrayBuffer for the DataView.
1066-
function dataViewFromCursor(cursor: Cursor, offset?: number): DataView {
1067-
// @ts-ignore
1068-
return new DataView(cursor.buffer.buffer, cursor.offset);
1069-
}
1070-
10711065
async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeader, opts: Options): Promise<Record<string, any>> {
10721066
const cursorEnd = cursor.offset + header.compressed_page_size;
10731067
const dataPageHeaderV2 = header.data_page_header_v2!;
@@ -1077,12 +1071,8 @@ async function decodeDataPageV2(cursor: Cursor, header: parquet_thrift.PageHeade
10771071
const valueEncoding = parquet_util.getThriftEnum(parquet_thrift.Encoding, dataPageHeaderV2.encoding);
10781072

10791073
/* read repetition levels */
1080-
const use_old_rlevels = false;
10811074
let rLevels: Array<any>;
1082-
let reader: DataReader = {
1083-
view: dataViewFromCursor(cursor),
1084-
offset: 0
1085-
}
1075+
let reader = dataReaderFromCursor(cursor, 0)
10861076

10871077
rLevels = readRepetitionLevelsV2(reader, dataPageHeaderV2, opts.rLevelMax || 0);
10881078
reader.offset = dataPageHeaderV2.repetition_levels_byte_length;

lib/util.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import fs, { WriteStream } from 'fs';
44
import * as parquet_thrift from '../gen-nodejs/parquet_types';
55
import { FileMetaDataExt, WriterOptions } from './declare';
66
import { Int64 } from 'thrift';
7+
import {Cursor, DataReader} from "./codec/types";
78

89
// Use this so users only need to implement the minimal amount of the WriteStream interface
910
export type WriteStreamMinimal = Pick<WriteStream, 'write' | 'end'>;
@@ -229,3 +230,9 @@ export const fieldIndexOf = function (arr: unknown[][], elem: unknown[]) {
229230
export const cloneInteger = (int: Int64) => {
230231
return new Int64(int.valueOf());
231232
};
233+
234+
235+
export function dataReaderFromCursor(data: Cursor, offset?: number): DataReader {
236+
let view = new DataView(data.buffer.buffer, data.offset);
237+
return { view , offset: offset || 0};
238+
}

test/codec_rle.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ function dataViewFromArray(data) {
88
const ab = new ArrayBuffer(data.length,{ maxByteLength: data.length });
99
let view = new DataView(ab, 0);
1010
data.forEach((val,idx) => view.setUint8(idx, val));
11-
return view;
11+
return view
1212
}
1313

1414
describe('ParquetCodec::RLE', function () {

test/lib/codec/rle.test.ts

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,36 @@
1-
import { expect } from 'chai';
2-
import { decodeRunBitpacked } from '../../../lib/codec/rle';
3-
import type { Cursor } from '../../../lib/codec/types';
1+
import {expect} from 'chai';
2+
import {decodeRunBitpacked} from '../../../lib/codec/rle';
3+
import {readRleBitPackedHybrid} from "../../../lib/codec/encoding";
44

55
describe('RLE Codec', function () {
6-
describe('#decodeRunBitpacked', function () {
7-
it('can decode a known bitpack value', function () {
8-
const cursor: Cursor = {
9-
// 136, 1, left off the front
10-
buffer: Buffer.from([7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]),
11-
offset: 0,
12-
};
13-
const values = decodeRunBitpacked(cursor, 24, { bitWidth: 1 });
14-
expect(values.length).equals(24);
15-
});
6+
it('can decode a known bitpack value', function () {
7+
// 136 = 10001000, 0x80 = 128 or 100000000
8+
const bitPackedDecBuffer = Buffer.from([136, 1, 7, 251, 127, 127, 28, 1, 9, 254, 251, 191, 63]);
9+
10+
const myView = new DataView(bitPackedDecBuffer.buffer);
11+
let decoder = new TextDecoder('utf-8');
12+
const res = bitPackedDecBuffer.map((val, i, ary) => myView.getUint8(i));
13+
// Answer was `var N1 = Mat` which is part of the compiled parquetjs library.
14+
// Undid some changes and now it's `module.exports = read`...
15+
console.log(decoder.decode(res));
1616

17+
const reader = {view: myView, offset: 0};
18+
const values = new Array(26).fill(0);
19+
const expected = [
20+
1, 1, 1, 1, 0, 1, 1, 0,
21+
0, 1, 1, 1, 0, 1, 1, 0,
22+
1, 1, 0, 0, 1, 1, 1, 0,
23+
0, 0
24+
];
25+
readRleBitPackedHybrid(reader, 1, 13, values, true);
26+
expect(values.length).equals(expected.length)
27+
// correct?
28+
values.forEach((val, i) => {
29+
expect(val, `${val} != ${expected[i]} for i = ${i}`).equals(expected[i])
30+
})
31+
});
32+
33+
describe('#decodeRunBitpacked', function () {
1734
// use the example from the documentation for RLE/Bitpacked hybrid,
1835
// https://parquet.apache.org/docs/file-format/data-pages/encodings/#RLE
1936
it('writes and reads bit packed values for documentation example correctly', () => {
@@ -26,7 +43,7 @@ describe('RLE Codec', function () {
2643

2744
// the number of values? or the number of bits used for encoding a set of values ?
2845
const bitPackedRunLength = 8;
29-
const bitPackedScaledRunLength = bitPackedRunLength/8;
46+
const bitPackedScaledRunLength = bitPackedRunLength / 8;
3047
// in the grammar it says it's EITHER the left-shifted value OR 1 if the shifted value is 0?? but that makes
3148
// no sense and results in an error.
3249
const shiftedBPSRL = (bitPackedScaledRunLength << 1) | 1;
@@ -41,10 +58,10 @@ describe('RLE Codec', function () {
4158
const bitWidth = 3;
4259

4360
// number of expected values in the result = 8
44-
const cursor = { buffer: Buffer.from(view.buffer), offset: 0 }
45-
const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth} );
61+
const cursor = {buffer: Buffer.from(view.buffer), offset: 0}
62+
const values = decodeRunBitpacked(cursor, decVals.length, {bitWidth});
4663
expect(values.length).equals(decVals.length);
47-
values.forEach((val,i) => {
64+
values.forEach((val, i) => {
4865
expect(val).equals(decVals[i]);
4966
})
5067
});

0 commit comments

Comments
 (0)