1
1
import boto3 # noqa
2
2
import datetime
3
+ from http .client import RemoteDisconnected
3
4
import importlib
4
5
import json
5
6
import jsonschema
6
7
import logging
7
8
import os
9
+ import random
10
+ import requests
8
11
import scrapelib
12
+ import time
13
+ from urllib .error import URLError
9
14
import uuid
10
15
from collections import defaultdict , OrderedDict
11
16
from jsonschema import Draft3Validator , FormatChecker
@@ -66,18 +71,35 @@ def clean_whitespace(obj):
66
71
return obj
67
72
68
73
74
+ def get_random_user_agent ():
75
+ """
76
+ Return a random user agent to help avoid detection.
77
+ """
78
+ user_agents = [
79
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" ,
80
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15" ,
81
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0" ,
82
+ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36" ,
83
+ "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1" ,
84
+ "Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1" ,
85
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.59" ,
86
+ ]
87
+ return random .choice (user_agents )
88
+
89
+
69
90
class Scraper (scrapelib .Scraper ):
70
91
"""Base class for all scrapers"""
71
92
72
93
def __init__ (
73
- self ,
74
- jurisdiction ,
75
- datadir ,
76
- * ,
77
- strict_validation = True ,
78
- fastmode = False ,
79
- realtime = False ,
80
- file_archiving_enabled = False ,
94
+ self ,
95
+ jurisdiction ,
96
+ datadir ,
97
+ * ,
98
+ strict_validation = True ,
99
+ fastmode = False ,
100
+ realtime = False ,
101
+ file_archiving_enabled = False ,
102
+ http_resilience_mode = False ,
81
103
):
82
104
super (Scraper , self ).__init__ ()
83
105
@@ -94,6 +116,19 @@ def __init__(
94
116
self .retry_wait_seconds = settings .SCRAPELIB_RETRY_WAIT_SECONDS
95
117
self .verify = settings .SCRAPELIB_VERIFY
96
118
119
+ # HTTP connection resilience settings
120
+ self .http_resilience_mode = http_resilience_mode
121
+ self .http_resilience_headers = {}
122
+ # http resilience: Set up a circuit breaker to track consecutive failures
123
+ self ._consecutive_failures = 0
124
+ self ._max_consecutive_failures = 3
125
+ self ._circuit_breaker_timeout = 120 # 2 minutes
126
+ # http resilience: Set up connection pool reset
127
+ self ._last_reset_time = time .time ()
128
+ self ._reset_interval = 600 # Reset connection pool every 10 minutes
129
+ self ._random_delay_on_failure_min = 5
130
+ self ._random_delay_on_failure_max = 15
131
+
97
132
# output
98
133
self .output_file_path = None
99
134
@@ -126,6 +161,10 @@ def __init__(
126
161
handler = importlib .import_module (modname )
127
162
self .scrape_output_handler = handler .Handler (self )
128
163
164
+ if self .http_resilience_mode :
165
+ self .headers ["User-Agent" ] = get_random_user_agent ()
166
+ self ._create_fresh_session ()
167
+
129
168
def push_to_queue (self ):
130
169
"""Push this output to the sqs for realtime imports."""
131
170
@@ -185,9 +224,8 @@ def save_object(self, obj):
185
224
186
225
# Remove redundant prefix
187
226
try :
188
- upload_file_path = file_path [
189
- file_path .index ("_data" ) + len ("_data" ) + 1 :
190
- ]
227
+ index = file_path .index ("_data" ) + len ("_data" ) + 1
228
+ upload_file_path = file_path [index :]
191
229
except Exception :
192
230
upload_file_path = file_path
193
231
@@ -268,6 +306,165 @@ def scrape(self, **kwargs):
268
306
self .__class__ .__name__ + " must provide a scrape() method"
269
307
)
270
308
309
+ def request_resiliently (self , request_func ):
310
+ try :
311
+ # Reset connection pool if needed
312
+ self ._reset_connection_pool_if_needed ()
313
+
314
+ # Add a random delay between processing items
315
+ self .add_random_delay (1 , 3 )
316
+
317
+ # If we've had too many consecutive failures, pause for a while
318
+ if self ._consecutive_failures >= self ._max_consecutive_failures :
319
+ self .logger .warning (
320
+ f"Circuit breaker triggered after { self ._consecutive_failures } consecutive failures. "
321
+ f"Pausing for { self ._circuit_breaker_timeout } seconds."
322
+ )
323
+ time .sleep (self ._circuit_breaker_timeout )
324
+ self ._consecutive_failures = 0
325
+
326
+ # Rotate user agent after circuit breaker timeout
327
+ self .headers ["User-Agent" ] = get_random_user_agent ()
328
+
329
+ response = self .retry_on_connection_error (
330
+ request_func ,
331
+ max_retries = 3 ,
332
+ initial_backoff = 10 ,
333
+ max_backoff = 120 ,
334
+ )
335
+
336
+ # Reset consecutive failures counter on success
337
+ self ._consecutive_failures = 0
338
+
339
+ return response
340
+ except Exception as e :
341
+ self ._consecutive_failures += 1
342
+ self .logger .error (f"Error processing item: { e } " )
343
+
344
+ # If it's a connection error, add a longer delay
345
+ if isinstance (e , (ConnectionError , RemoteDisconnected )):
346
+ self .logger .warning ("Connection error. Adding longer delay." )
347
+ self .add_random_delay (self ._random_delay_on_failure_min , self ._random_delay_on_failure_max )
348
+
349
+ # Rotate user agent after connection error
350
+ self .headers ["User-Agent" ] = get_random_user_agent ()
351
+
352
+ def get (self , url , ** kwargs ):
353
+ request_func = lambda : super (Scraper , self ).get (url , ** kwargs ) # noqa: E731
354
+ if self .http_resilience_mode :
355
+ return self .request_resiliently (request_func )
356
+ else :
357
+ return super ().get (url , ** kwargs )
358
+
359
+ def post (self , url , data = None , json = None , ** kwargs ):
360
+ request_func = lambda : super (Scraper , self ).post (url , data = data , json = json ** kwargs ) # noqa: E731
361
+ if self .http_resilience_mode :
362
+ return self .request_resiliently (request_func )
363
+ else :
364
+ return super ().post (url , data = data , json = json , ** kwargs )
365
+
366
+ def retry_on_connection_error (self , func , max_retries = 5 , initial_backoff = 2 , max_backoff = 60 ):
367
+ """
368
+ Retry a function call on connection errors with exponential backoff.
369
+
370
+ Args:
371
+ func: Function to call
372
+ max_retries: Maximum number of retries
373
+ initial_backoff: Initial backoff time in seconds
374
+ max_backoff: Maximum backoff time in seconds
375
+
376
+ Returns:
377
+ The result of the function call
378
+ """
379
+ retries = 0
380
+ backoff = initial_backoff
381
+
382
+ while True :
383
+ try :
384
+ return func ()
385
+ except (
386
+ ConnectionError ,
387
+ RemoteDisconnected ,
388
+ URLError ,
389
+ requests .exceptions .Timeout ,
390
+ requests .exceptions .RequestException ,
391
+ ) as e :
392
+ retries += 1
393
+ if retries > max_retries :
394
+ self .logger .error (f"Max retries ({ max_retries } ) exceeded. Last error: { e } " )
395
+ raise
396
+
397
+ # Calculate backoff with jitter
398
+ jitter = random .uniform (0.8 , 1.2 )
399
+ current_backoff = min (backoff * jitter , max_backoff )
400
+
401
+ self .logger .warning (
402
+ f"Connection error: { e } . Retrying in { current_backoff :.2f} seconds (attempt { retries } /{ max_retries } )"
403
+ )
404
+ time .sleep (current_backoff )
405
+
406
+ # Increase backoff for next retry
407
+ backoff = min (backoff * 2 , max_backoff )
408
+
409
+ def _create_fresh_session (self ):
410
+ """
411
+ Create a fresh session with appropriate settings.
412
+ """
413
+ if hasattr (self , "session" ):
414
+ self .session .close ()
415
+
416
+ # Create a new session
417
+ self .session = requests .Session ()
418
+
419
+ # Set any custom headers
420
+ self .session .headers .update (self .http_resilience_headers )
421
+
422
+ # Set up retry mechanism
423
+ adapter = requests .adapters .HTTPAdapter (
424
+ max_retries = self .retry_attempts ,
425
+ pool_connections = 10 ,
426
+ pool_maxsize = 10 ,
427
+ pool_block = False ,
428
+ )
429
+ self .session .mount ("http://" , adapter )
430
+ self .session .mount ("https://" , adapter )
431
+
432
+ self .headers ["User-Agent" ] = get_random_user_agent ()
433
+
434
+ self .logger .info (
435
+ f"Created fresh session with user agent: { self .headers ['User-Agent' ]} "
436
+ )
437
+
438
+ return self .session
439
+
440
+ def _reset_connection_pool_if_needed (self ):
441
+ """
442
+ Reset the connection pool if it's been too long since the last reset.
443
+ This helps prevent "Remote end closed connection without response" errors.
444
+ """
445
+ current_time = time .time ()
446
+ if current_time - self ._last_reset_time > self ._reset_interval :
447
+ self .logger .info (
448
+ f"Resetting connection pool after { self ._reset_interval } seconds"
449
+ )
450
+
451
+ # Create a fresh session
452
+ self ._create_fresh_session ()
453
+
454
+ self ._last_reset_time = current_time
455
+
456
+ def add_random_delay (self , min_seconds = 1 , max_seconds = 3 ):
457
+ """
458
+ Add a random delay to simulate human behavior.
459
+
460
+ Args:
461
+ min_seconds: Minimum delay in seconds
462
+ max_seconds: Maximum delay in seconds
463
+ """
464
+ delay = random .uniform (min_seconds , max_seconds )
465
+ self .logger .debug (f"Adding random delay of { delay :.2f} seconds" )
466
+ time .sleep (delay )
467
+
271
468
272
469
class BaseBillScraper (Scraper ):
273
470
skipped = 0
@@ -400,15 +597,15 @@ def add_link(self, url, *, note=""):
400
597
401
598
class AssociatedLinkMixin (object ):
402
599
def _add_associated_link (
403
- self ,
404
- collection ,
405
- note ,
406
- url ,
407
- * ,
408
- media_type ,
409
- on_duplicate = "warn" ,
410
- date = "" ,
411
- classification = "" ,
600
+ self ,
601
+ collection ,
602
+ note ,
603
+ url ,
604
+ * ,
605
+ media_type ,
606
+ on_duplicate = "warn" ,
607
+ date = "" ,
608
+ classification = "" ,
412
609
):
413
610
if on_duplicate not in ["error" , "ignore" , "warn" ]:
414
611
raise ScrapeValueError ("on_duplicate must be 'warn', 'error' or 'ignore'" )
@@ -435,7 +632,7 @@ def _add_associated_link(
435
632
seen_links .add (link ["url" ])
436
633
437
634
if all (
438
- ver .get (x ) == item .get (x ) for x in ["note" , "date" , "classification" ]
635
+ ver .get (x ) == item .get (x ) for x in ["note" , "date" , "classification" ]
439
636
):
440
637
matches = matches + 1
441
638
ver = item
0 commit comments