Skip to content

Commit ba60652

Browse files
authored
feat(fetch): add browser/node http client (#15)
Add browser/node support for fetching http request.
1 parent a4f425a commit ba60652

File tree

6 files changed

+1784
-33
lines changed

6 files changed

+1784
-33
lines changed

lib/reader.js

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const parquet_types = require('./types');
1212
const BufferReader = require('./bufferReader');
1313
const bloomFilterReader = require('./bloomFilterIO/bloomFilterReader');
1414
const groupBy = require("lodash/groupBy");
15+
const fetch = require('cross-fetch');
1516

1617
const {
1718
getBloomFiltersFor,
@@ -125,8 +126,8 @@ class ParquetReader {
125126
* a `url` property.
126127
* This function returns a new parquet reader
127128
*/
128-
static async openUrl(request, params, options) {
129-
let envelopeReader = await ParquetEnvelopeReader.openUrl(request, params, options);
129+
static async openUrl(params, options) {
130+
let envelopeReader = await ParquetEnvelopeReader.openUrl(params, options);
130131
return this.openEnvelopeReader(envelopeReader, options);
131132
}
132133

@@ -136,7 +137,9 @@ class ParquetReader {
136137
}
137138
try {
138139
await envelopeReader.readHeader();
140+
139141
let metadata = await envelopeReader.readFooter();
142+
140143
return new ParquetReader(metadata, envelopeReader, opts);
141144
} catch (err) {
142145
await envelopeReader.close();
@@ -373,7 +376,7 @@ class ParquetEnvelopeReader {
373376
return new ParquetEnvelopeReader(readFn, closeFn, fileStat, options);
374377
}
375378

376-
static async openUrl(request, params, options) {
379+
static async openUrl(params, options) {
377380
if (typeof params === 'string')
378381
params = {url: params};
379382
if (!params.url)
@@ -382,34 +385,23 @@ class ParquetEnvelopeReader {
382385
let base = params.url.split('/');
383386
base = base.slice(0, base.length-1).join('/')+'/';
384387

385-
params.encoding = params.encoding || null;
386-
387388
let defaultHeaders = params.headers || {};
388389

389-
let filesize = async () => new Promise( (resolve, reject) => {
390-
let req = request(params);
391-
req.on('response', res => {
392-
req.abort();
393-
resolve(res.headers['content-length']);
394-
});
395-
req.on('error', reject);
396-
});
390+
let filesize = async () => {
391+
392+
const { headers } = await fetch(params.url);
393+
return headers.get('Content-Length');
394+
};
397395

398-
let readFn = (offset, length, file) => {
396+
let readFn = async (offset, length, file) => {
399397
let url = file ? base+file : params.url;
400-
401398
let range = `bytes=${offset}-${offset+length-1}`;
402399
let headers = Object.assign({}, defaultHeaders, {range});
403-
let req = Object.assign({}, params, {headers, url});
404-
return new Promise( (resolve, reject) => {
405-
request(req, (err, res) => {
406-
if (err) {
407-
reject(err);
408-
} else {
409-
resolve(res.body);
410-
}
411-
});
412-
});
400+
const response = await fetch(url, { headers });
401+
const arrayBuffer = await response.arrayBuffer();
402+
const buffer = Buffer.from(arrayBuffer);
403+
404+
return buffer;
413405
};
414406

415407
let closeFn = () => ({});
@@ -436,6 +428,7 @@ class ParquetEnvelopeReader {
436428

437429
readHeader() {
438430
return this.read(0, PARQUET_MAGIC.length).then(buf => {
431+
439432
if (buf.toString() != PARQUET_MAGIC) {
440433
throw 'not valid parquet file'
441434
}
@@ -623,7 +616,9 @@ class ParquetEnvelopeReader {
623616
if (typeof this.fileSize === 'function') {
624617
this.fileSize = await this.fileSize();
625618
}
619+
626620
let trailerLen = PARQUET_MAGIC.length + 4;
621+
627622
let trailerBuf = await this.read(this.fileSize - trailerLen, trailerLen);
628623

629624
if (trailerBuf.slice(4).toString() != PARQUET_MAGIC) {

0 commit comments

Comments
 (0)