Skip to content

Commit 19707ef

Browse files
Support bloom filters in lists/nested columns (#105)
For bloom filters, treat column names as the full path and not just the first part of the path in the schema. Closes #98 with @rlaferla Co-authored-by: Wil Wade <[email protected]>
1 parent bda4e3f commit 19707ef

File tree

4 files changed

+373
-241
lines changed

4 files changed

+373
-241
lines changed

lib/bloomFilterIO/bloomFilterReader.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ export const siftAllByteOffsets = (
117117
};
118118

119119
export const getBloomFiltersFor = async (
120-
columnNames: Array<string>,
120+
paths: Array<string>,
121121
envelopeReader: InstanceType<typeof ParquetEnvelopeReader>
122122
) => {
123123
const columnChunkDataCollection = envelopeReader.getAllColumnChunkDataFor(
124-
columnNames
124+
paths
125125
);
126126
const bloomFilterOffsetData = siftAllByteOffsets(columnChunkDataCollection);
127127
const offsetByteValues = bloomFilterOffsetData.map(

lib/reader.ts

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,25 @@ import * as parquet_schema from './schema';
66
import * as parquet_codec from './codec';
77
import * as parquet_compression from './compression';
88
import * as parquet_types from './types';
9-
import BufferReader , { BufferReaderOptions } from './bufferReader';
9+
import BufferReader, {BufferReaderOptions} from './bufferReader';
1010
import * as bloomFilterReader from './bloomFilterIO/bloomFilterReader';
1111
import fetch from 'cross-fetch';
12-
import { ParquetCodec, Parameter,PageData, SchemaDefinition, ParquetType, FieldDefinition, ParquetField, ClientS3, ClientParameters, FileMetaDataExt, NewPageHeader, RowGroupExt, ColumnChunkExt } from './declare';
13-
import { Cursor, Options } from './codec/types';
12+
import {
13+
ParquetCodec,
14+
Parameter,
15+
PageData,
16+
SchemaDefinition,
17+
ParquetType,
18+
FieldDefinition,
19+
ParquetField,
20+
ClientS3,
21+
ClientParameters,
22+
FileMetaDataExt,
23+
NewPageHeader,
24+
RowGroupExt,
25+
ColumnChunkExt
26+
} from './declare';
27+
import {Cursor, Options} from './codec/types';
1428

1529
const {
1630
getBloomFiltersFor,
@@ -35,7 +49,7 @@ const PARQUET_RDLVL_ENCODING = 'RLE';
3549
/**
3650
* A parquet cursor is used to retrieve rows from a parquet file in order
3751
*/
38-
class ParquetCursor {
52+
class ParquetCursor {
3953

4054
metadata: FileMetaDataExt;
4155
envelopeReader: ParquetEnvelopeReader;
@@ -44,13 +58,14 @@ class ParquetCursor {
4458
rowGroup: Array<unknown>;
4559
rowGroupIndex: number;
4660
cursorIndex: number;
61+
4762
/**
4863
* Create a new parquet reader from the file metadata and an envelope reader.
4964
* It is usually not recommended to call this constructor directly except for
5065
* advanced and internal use cases. Consider using getCursor() on the
5166
* ParquetReader instead
5267
*/
53-
constructor( metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, schema: parquet_schema.ParquetSchema, columnList: Array<Array<unknown>>) {
68+
constructor(metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, schema: parquet_schema.ParquetSchema, columnList: Array<Array<unknown>>) {
5469
this.metadata = metadata;
5570
this.envelopeReader = envelopeReader;
5671
this.schema = schema;
@@ -72,9 +87,9 @@ class ParquetCursor {
7287
}
7388

7489
let rowBuffer = await this.envelopeReader.readRowGroup(
75-
this.schema,
76-
this.metadata.row_groups[this.rowGroupIndex],
77-
this.columnList);
90+
this.schema,
91+
this.metadata.row_groups[this.rowGroupIndex],
92+
this.columnList);
7893

7994
this.rowGroup = parquet_shredder.materializeRecords(this.schema, rowBuffer);
8095
this.rowGroupIndex++;
@@ -363,7 +378,7 @@ export class ParquetReader {
363378
}
364379

365380
decodePages(buffer: Buffer, opts: Options) {
366-
return decodePages(buffer,opts);
381+
return decodePages(buffer, opts);
367382
}
368383

369384
}
@@ -375,6 +390,7 @@ export class ParquetReader {
375390
* rows from a parquet file use the ParquetReader instead
376391
*/
377392
let ParquetEnvelopeReaderIdCounter = 0;
393+
378394
export class ParquetEnvelopeReader {
379395
readFn: (offset: number, length: number, file?: string) => Promise<Buffer>;
380396
close: () => unknown;
@@ -393,7 +409,7 @@ export class ParquetEnvelopeReader {
393409
return Promise.reject('external references are not supported');
394410
}
395411

396-
return parquet_util.fread(fileDescriptor, offset, length);
412+
return parquet_util.fread(fileDescriptor, offset, length);
397413
};
398414

399415
let closeFn = parquet_util.fclose.bind(undefined, fileDescriptor);
@@ -407,15 +423,15 @@ export class ParquetEnvelopeReader {
407423
return Promise.reject('external references are not supported');
408424
}
409425

410-
return Promise.resolve(buffer.slice(offset,offset+length));
426+
return Promise.resolve(buffer.slice(offset, offset + length));
411427
};
412428

413429
let closeFn = () => ({});
414430
return new ParquetEnvelopeReader(readFn, closeFn, buffer.length, options);
415431
}
416432

417433
static async openS3(client: ClientS3, params: ClientParameters, options?: BufferReaderOptions) {
418-
let fileStat = async () => client.headObject(params).promise().then((d: {ContentLength: number}) => d.ContentLength);
434+
let fileStat = async () => client.headObject(params).promise().then((d: { ContentLength: number }) => d.ContentLength);
419435

420436
let readFn = async (offset: number, length: number, file?: string) => {
421437
if (file) {

0 commit comments

Comments
 (0)