|
| 1 | +# Databricks notebook source |
| 2 | +# MAGIC %pip install dlt-with-debug |
| 3 | + |
| 4 | +# COMMAND ---------- |
| 5 | + |
| 6 | +from pyspark.sql.functions import * |
| 7 | +from pyspark.sql.types import * |
| 8 | + |
| 9 | +from dlt_with_debug import dltwithdebug, pipeline_id, showoutput |
| 10 | + |
| 11 | +if pipeline_id: |
| 12 | + import dlt |
| 13 | +else: |
| 14 | + from dlt_with_debug import dlt |
| 15 | + |
| 16 | +# COMMAND ---------- |
| 17 | + |
| 18 | +json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" |
| 19 | + |
| 20 | +# COMMAND ---------- |
| 21 | + |
| 22 | +@dlt.create_table( |
| 23 | + comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.", |
| 24 | + table_properties={ |
| 25 | + "quality": "bronze" |
| 26 | + } |
| 27 | +) |
| 28 | +@dltwithdebug(globals()) |
| 29 | +def clickstream_raw(): |
| 30 | + return ( |
| 31 | + spark.read.option("inferSchema", "true").json(json_path) |
| 32 | + ) |
| 33 | + |
| 34 | +# COMMAND ---------- |
| 35 | + |
| 36 | +showoutput(clickstream_raw) |
| 37 | + |
| 38 | +# COMMAND ---------- |
| 39 | + |
| 40 | +@dlt.create_table( |
| 41 | + comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.", |
| 42 | + table_properties={ |
| 43 | + "quality": "silver" |
| 44 | + } |
| 45 | +) |
| 46 | +@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL") |
| 47 | +@dlt.expect_or_fail("valid_count", "click_count > 0") |
| 48 | +@dlt.expect_all({'valid_prev_page_id': "previous_page_id IS NOT NULL"}) |
| 49 | +@dltwithdebug(globals()) |
| 50 | +def clickstream_clean(): |
| 51 | + return ( |
| 52 | + dlt.read("clickstream_raw") |
| 53 | + .withColumn("current_page_id", expr("CAST(curr_id AS INT)")) |
| 54 | + .withColumn("click_count", expr("CAST(n AS INT)")) |
| 55 | + .withColumn("previous_page_id", expr("CAST(prev_id AS INT)")) |
| 56 | + .withColumnRenamed("curr_title", "current_page_title") |
| 57 | + .withColumnRenamed("prev_title", "previous_page_title") |
| 58 | + .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title") |
| 59 | + ) |
| 60 | + |
| 61 | +# COMMAND ---------- |
| 62 | + |
| 63 | +showoutput(clickstream_clean) |
| 64 | + |
| 65 | +# COMMAND ---------- |
| 66 | + |
| 67 | + |
0 commit comments