Skip to content

Commit 2c733b5

Browse files
authored
Add ability to read decimal columns (#79)
Problem ======= Often parquet files have a column of type `decimal`. Currently `decimal` column types are not supported for reading. Solution ======== I implemented the required code to allow properly reading(only) of decimal columns without any external libraries. Change summary: --------------- * I made a lot of commits as this required some serious trial and error * modified `lib/codec/types.ts` to allow precision and scale properties on the `Options` interface for use when decoding column data * modified `lib/declare.ts` to allow `Decimal` in `OriginalType`, also modified `FieldDefinition` and `ParquetField` to include precision and scale. * In `plain.ts` I modified the `decodeValues_INT32` and `decodeValues_INT64` to take options so I can determine the column type and if `DECIMAL`, call the `decodeValues_DECIMAL` function which uses the options object's precision and scale configured to decode the column * modified `lib/reader.ts` to set the `originalType`, `precision`, `scale` and name while in `decodePage` as well as `precision` and `scale` in `decodeSchema` to retrieve that data from the parquet file to be used while decoding data for a Decimal column * modified `lib/schema.ts` to indicate what is required from a parquet file for a decimal column in order to process it properly, as well as passing along the `precision` and `scale` if those options exist on a column * adding `DECIMAL` configuration to `PARQUET_LOGICAL_TYPES` * updating `test/decodeSchema.js` to set precision and scale to null as they are now set to for non decimal types * added some Decimal specific tests in `test/reader.js` and `test/schema.js` Steps to Verify: ---------------- 1. Take this code, and paste it into a file at the root of the repo with the `.js` extenstion: ``` const parquet = require('./dist/parquet') async function main () { const file = './test/test-files/valid-decimal-columns.parquet' await _readParquetFile(file) } async function _readParquetFile (filePath) { const reader = await parquet.ParquetReader.openFile(filePath) console.log(reader.schema) let cursor = reader.getCursor() const columnListFromFile = [] cursor.schema.fieldList.forEach((rec, i) => { columnListFromFile.push({ name: rec.name, type: rec.originalType }) }) let record = null let count = 0 const previewData = [] const columnsToRead = columnListFromFile.map(col => col.name) cursor = reader.getCursor(columnsToRead) console.log('-------------------- data --------------------') while (record = await cursor.next()) { previewData.push(record) console.log(`Row: ${count}`) console.log(record) count++ } await reader.close() } main() .catch(error => { console.error(error) process.exit(1) }) ``` 2. run the code in a terminal using `node <your file name>.js` 3. Verify that the schema indicates 4 columns, including `over_9_digits` with scale: 7, and precision 10. As well as a column `under_9_digits` with scale: 4, precision: 6. 4. The values of those columns should match this table: ![Screenshot 2023-04-22 at 16 53 33](https://user-images.githubusercontent.com/2294003/233810916-3d1a37da-ef22-4e1c-8e46-9961d7470e5e.png)
1 parent a011a2e commit 2c733b5

File tree

10 files changed

+175
-23
lines changed

10 files changed

+175
-23
lines changed

lib/codec/plain.ts

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,21 @@ function encodeValues_INT32(values: Array<number>) {
3535
return buf;
3636
}
3737

38-
function decodeValues_INT32(cursor: Cursor, count: number) {
38+
function decodeValues_INT32(cursor: Cursor, count: number, opts: Options) {
3939
let values = [];
40-
41-
for (let i = 0; i < count; ++i) {
42-
values.push(cursor.buffer.readInt32LE(cursor.offset));
43-
cursor.offset += 4;
40+
const name = opts.name || opts.column?.name || undefined;
41+
try {
42+
if (opts.originalType === 'DECIMAL') {
43+
values = decodeValues_DECIMAL(cursor, count, opts);
44+
} else {
45+
for (let i = 0; i < count; ++i) {
46+
values.push(cursor.buffer.readInt32LE(cursor.offset));
47+
cursor.offset += 4;
48+
}
49+
}
50+
} catch (e) {
51+
console.log(`Error thrown for column: ${name}`);
52+
throw e;
4453
}
4554

4655
return values;
@@ -55,12 +64,59 @@ function encodeValues_INT64(values: Array<number>) {
5564
return buf;
5665
}
5766

58-
function decodeValues_INT64(cursor: Cursor, count: number) {
67+
function decodeValues_INT64(cursor: Cursor, count: number, opts: Options) {
68+
let values = [];
69+
const name = opts.name || opts.column?.name || undefined;
70+
try {
71+
if (opts.originalType === 'DECIMAL' || opts.column?.originalType === 'DECIMAL') {
72+
let columnOptions: any = opts.column?.originalType ? opts.column : opts;
73+
values = decodeValues_DECIMAL(cursor, count, columnOptions);
74+
} else {
75+
for (let i = 0; i < count; ++i) {
76+
values.push(cursor.buffer.readBigInt64LE(cursor.offset));
77+
cursor.offset += 8;
78+
}
79+
}
80+
} catch (e) {
81+
console.log(`Error thrown for column: ${name}`);
82+
throw e;
83+
}
84+
85+
return values;
86+
}
87+
88+
function decodeValues_DECIMAL(cursor: Cursor, count: number, opts: Options) {
89+
let {
90+
scale,
91+
precision
92+
} = opts;
93+
94+
const name = opts.name || undefined
95+
if (!scale) {
96+
throw `missing option: scale (required for DECIMAL) for column: ${name}`;
97+
}
98+
if (!precision) {
99+
throw `missing option: precision (required for DECIMAL) for column: ${name}`;
100+
}
101+
59102
let values = [];
60103

104+
// by default we prepare the offset and bufferFunction to work with 32bit integers
105+
let offset = 4;
106+
let bufferFunction: any = (offset: number) => cursor.buffer.readInt32LE(offset);
107+
if (precision > 9) {
108+
// if the precision is over 9 digits, then we are dealing with a 64bit integer
109+
offset = 8;
110+
bufferFunction = (offset: number) => cursor.buffer.readBigInt64LE(offset);
111+
}
61112
for (let i = 0; i < count; ++i) {
62-
values.push(cursor.buffer.readBigInt64LE(cursor.offset));
63-
cursor.offset += 8;
113+
const bufferSize = cursor.size || 0
114+
if (bufferSize === 0 || cursor.offset < bufferSize) {
115+
const fullValue = bufferFunction(cursor.offset);
116+
const valueWithDecimalApplied = Number(fullValue) / Math.pow(10, scale);
117+
values.push(valueWithDecimalApplied);
118+
cursor.offset += offset;
119+
}
64120
}
65121

66122
return values;
@@ -266,10 +322,10 @@ export const decodeValues = function (
266322
return decodeValues_BOOLEAN(cursor, count);
267323

268324
case "INT32":
269-
return decodeValues_INT32(cursor, count);
325+
return decodeValues_INT32(cursor, count, opts);
270326

271327
case "INT64":
272-
return decodeValues_INT64(cursor, count);
328+
return decodeValues_INT64(cursor, count, opts);
273329

274330
case "INT96":
275331
return decodeValues_INT96(cursor, count);

lib/codec/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ export interface Options {
1818
rLevelMax?: number,
1919
dLevelMax?: number,
2020
type?: string,
21+
name?: string,
22+
precision?: number,
23+
scale?: number
2124
}
2225

2326
export interface Cursor {

lib/declare.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export type OriginalType =
2929
// | 'MAP_KEY_VALUE' // 2
3030
| 'LIST' // 3
3131
| 'ENUM' // 4
32-
// | 'DECIMAL' // 5
32+
| 'DECIMAL' // 5
3333
| 'DATE' // 6
3434
| 'TIME_MILLIS' // 7
3535
| 'TIME_MICROS' // 8
@@ -62,6 +62,8 @@ export interface FieldDefinition {
6262
statistics?: Statistics | false;
6363
parent?: ParentField
6464
num_children?: NumChildrenField
65+
precision?: number
66+
scale?: number
6567
}
6668

6769
export interface ParquetField {
@@ -74,6 +76,8 @@ export interface ParquetField {
7476
typeLength?: number;
7577
encoding?: ParquetCodec;
7678
compression?: ParquetCompression;
79+
precision?: number;
80+
scale?: number;
7781
rLevelMax: number;
7882
dLevelMax: number;
7983
isNested?: boolean;

lib/reader.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,11 @@ async function decodeDataPage(cursor: Cursor, header: parquet_thrift.PageHeader,
938938
{
939939
typeLength: opts.column!.typeLength!,
940940
bitWidth: opts.column!.typeLength!,
941-
disableEnvelope: opts.column!.disableEnvelope
941+
disableEnvelope: opts.column!.disableEnvelope,
942+
originalType: opts.column!.originalType,
943+
precision: opts.column!.precision,
944+
scale: opts.column!.scale,
945+
name: opts.column!.name
942946
});
943947

944948
cursor.offset = cursorEnd;
@@ -1084,7 +1088,9 @@ function decodeSchema(schemaElements: Array<parquet_thrift.SchemaElement>) {
10841088
type: logicalType as ParquetType,
10851089
typeLength: schemaElement.type_length,
10861090
optional: optional,
1087-
repeated: repeated
1091+
repeated: repeated,
1092+
scale: schemaElement.scale,
1093+
precision: schemaElement.precision
10881094
};
10891095
}
10901096

lib/schema.ts

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as parquet_codec from './codec';
22
import * as parquet_compression from './compression'
33
import * as parquet_types from './types'
4-
import { SchemaDefinition, ParquetField, RepetitionType } from './declare'
4+
import { SchemaDefinition, ParquetField, RepetitionType, FieldDefinition } from './declare'
55

66
const PARQUET_COLUMN_KEY_SEPARATOR = '.';
77

@@ -119,10 +119,10 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
119119
statistics: opts.statistics,
120120
fieldCount: Object.keys(opts.fields).length,
121121
fields: buildFields(
122-
opts.fields,
123-
rLevelMax,
124-
dLevelMax,
125-
path.concat(name))
122+
opts.fields,
123+
rLevelMax,
124+
dLevelMax,
125+
path.concat(name))
126126
};
127127

128128
if (opts.type == 'LIST' || opts.type == 'MAP') fieldList[name].originalType = opts.type;
@@ -158,6 +158,10 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
158158
fieldErrors.push(`Unsupported compression method: ${opts.compression}, for Column: ${nameWithPath}`);
159159
}
160160

161+
if (typeDef.originalType === 'DECIMAL') {
162+
fieldErrors = fieldErrors.concat(errorsForDecimalOpts(typeDef.originalType, opts, nameWithPath));
163+
}
164+
161165
/* add to schema */
162166
fieldList[name] = {
163167
name: name,
@@ -168,6 +172,8 @@ function buildFields(schema: SchemaDefinition, rLevelParentMax?: number, dLevelP
168172
encoding: opts.encoding,
169173
statistics: opts.statistics,
170174
compression: opts.compression,
175+
precision: opts.precision,
176+
scale: opts.scale,
171177
typeLength: opts.typeLength || typeDef.typeLength,
172178
rLevelMax: rLevelMax,
173179
dLevelMax: dLevelMax
@@ -199,3 +205,23 @@ function listFields(fields: Record<string, ParquetField>) {
199205
function isDefined<T>(val: T | undefined): val is T {
200206
return val !== undefined;
201207
}
208+
209+
function errorsForDecimalOpts(type: string, opts: FieldDefinition, columnName: string): string[] {
210+
const fieldErrors = []
211+
if(!opts.precision) {
212+
fieldErrors.push(
213+
`invalid schema for type: ${type}, for Column: ${columnName}, precision is required`
214+
);
215+
}
216+
else if (opts.precision > 18) {
217+
fieldErrors.push(
218+
`invalid precision for type: ${type}, for Column: ${columnName}, can not handle precision over 18`
219+
);
220+
}
221+
if (!opts.scale) {
222+
fieldErrors.push(
223+
`invalid schema for type: ${type}, for Column: ${columnName}, scale is required`
224+
);
225+
}
226+
return fieldErrors
227+
}

lib/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ export const PARQUET_LOGICAL_TYPES: ParquetTypeData = {
133133
originalType: 'INT_64',
134134
toPrimitive: toPrimitive_INT64
135135
},
136+
'DECIMAL': {
137+
primitiveType: 'INT64',
138+
originalType: 'DECIMAL',
139+
toPrimitive: toPrimitive_INT64
140+
},
136141
'JSON': {
137142
primitiveType: 'BYTE_ARRAY',
138143
originalType: 'JSON',

test/decodeSchema.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@ describe('ParquetSchema', function() {
157157
"encoding": "PLAIN",
158158
"compression": "UNCOMPRESSED",
159159
"rLevelMax": 0,
160-
"dLevelMax": 0
160+
"dLevelMax": 0,
161+
"precision": null,
162+
"scale": null
161163
}
162164
}
163165
},
@@ -191,7 +193,9 @@ describe('ParquetSchema', function() {
191193
"encoding": "PLAIN",
192194
"compression": "UNCOMPRESSED",
193195
"rLevelMax": 0,
194-
"dLevelMax": 0
196+
"dLevelMax": 0,
197+
"precision": null,
198+
"scale": null
195199
},
196200
"g": {
197201
"name": "g",
@@ -209,7 +213,9 @@ describe('ParquetSchema', function() {
209213
"encoding": "PLAIN",
210214
"compression": "UNCOMPRESSED",
211215
"rLevelMax": 0,
212-
"dLevelMax": 0
216+
"dLevelMax": 0,
217+
"precision": null,
218+
"scale": null
213219
}
214220
}
215221
}
@@ -229,7 +235,9 @@ describe('ParquetSchema', function() {
229235
"encoding": "PLAIN",
230236
"compression": "UNCOMPRESSED",
231237
"rLevelMax": 0,
232-
"dLevelMax": 0
238+
"dLevelMax": 0,
239+
"precision": null,
240+
"scale": null
233241
}
234242
}
235243
}

test/reader.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,5 +107,25 @@ describe("ParquetReader", () => {
107107

108108
assert.equal(counter, 40000);
109109
})
110-
})
110+
});
111+
112+
describe("#handleDecimal", () => {
113+
it("loads parquet with columns configured as DECIMAL", async () => {
114+
const reader = await parquet.ParquetReader.openFile(
115+
path.join(__dirname,'test-files','valid-decimal-columns.parquet')
116+
);
117+
118+
const data = []
119+
for await(const record of reader) {
120+
data.push(record)
121+
}
122+
123+
assert.equal(data.length, 4);
124+
assert.equal(data[0].over_9_digits, 118.0297106);
125+
assert.equal(data[1].under_9_digits, 18.7106);
126+
// handling null values
127+
assert.equal(data[2].over_9_digits, undefined);
128+
assert.equal(data[2].under_9_digits, undefined);
129+
})
130+
});
111131
});

test/schema.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,4 +524,28 @@ describe('ParquetSchema', function() {
524524
}, 'Unsupported compression method: UNKNOWN, for Column: quantity');
525525
});
526526

527+
it('should throw error given decimal with no precision', function() {
528+
assert.throws(() => {
529+
new parquet.ParquetSchema({
530+
test_decimal_col: {type: 'DECIMAL', scale: 4},
531+
})
532+
}, 'invalid schema for type: DECIMAL, for Column: test_decimal_col, precision is required');
533+
});
534+
535+
it('should throw error given decimal with no scale', function() {
536+
assert.throws(() => {
537+
new parquet.ParquetSchema({
538+
test_decimal_col: {type: 'DECIMAL', precision: 4},
539+
})
540+
}, 'invalid schema for type: DECIMAL, for Column: test_decimal_col, scale is required');
541+
});
542+
543+
it('should throw error given decimal with over 18 precision', function() {
544+
assert.throws(() => {
545+
new parquet.ParquetSchema({
546+
decimal_column: {type: 'DECIMAL', precision: 19, scale: 5},
547+
})
548+
}, 'invalid precision for type: DECIMAL, for Column: decimal_column, can not handle precision over 18');
549+
});
550+
527551
});
1.36 KB
Binary file not shown.

0 commit comments

Comments
 (0)