# Databricks notebook source # MAGIC %md # MAGIC # MAGIC # MAGIC ``` # MAGIC # MAGIC Name: nb-different-file-formats # MAGIC # MAGIC Design Phase: # MAGIC Author: John Miner # MAGIC Date: 11-24-2022 # MAGIC Purpose: Teach readers the following SPARK topics # MAGIC # MAGIC Learning Guide: # MAGIC # MAGIC 0 - Define funcitons # MAGIC 1 - Create logical lake in local dbfs # MAGIC # MAGIC 2 - Load airline data as temp view # MAGIC 3 - Load airport data as temp view # MAGIC 4 - Load departure delays data as temp view # MAGIC 5 - Load airplane data as temp view # MAGIC # MAGIC 6 - Explore datasets with spark SQL # MAGIC # MAGIC 7 - Save airline data as table # MAGIC 8 - Save airport data as table # MAGIC 9 - Save airplane data as table # MAGIC # MAGIC 10 - Try different partitioned file formats (arvo, orc, json, parquet, csv, delta) # MAGIC 11 - Create external tables off partitioned files # MAGIC 12 - query by partition # MAGIC 13 - query across partition # MAGIC # MAGIC ``` # COMMAND ---------- # https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html # https://www.stat.purdue.edu/~lfindsen/stat350/airline2008_dataset_definition.pdf # https://www.nationsonline.org/oneworld/IATA_Codes/airport_code_list.htm # https://sparkbyexamples.com/spark/spark-read-orc-file-into-dataframe/ # https://spark.apache.org/docs/latest/sql-data-sources-orc.html # COMMAND ---------- # # 0 - define functions # # COMMAND ---------- # # delete_dir - remove all content in dir # def delete_dir(dirname): try: # get list of files + dirs files = dbutils.fs.ls(dirname) # for each object for f in files: # recursive call if object is a directory if f.isDir(): delete_dir(f.path) # remove file or dir dbutils.fs.rm(f.path, recurse=True) # remove top most dir dbutils.fs.rm(dirname, recurse=True) except: pass # COMMAND ---------- # # get_file_info - return file cnt + size in gb # def get_file_info(dirname): # variables lst_vars = [0, 0] # get array of files / dirs files = dbutils.fs.ls(dirname) # for each object for f in files: # object = dir if f.isDir(): # recursive function call r = get_file_info(f.path) # save size / number lst_vars[0] += r['size'] lst_vars[1] += r['number'] # object <> dir, save size / number lst_vars[0] += f.size lst_vars[1] += 1 # return data return {'size': lst_vars[0], 'number': lst_vars[1]} # COMMAND ---------- # # update_file_dict – update dictionary info # def update_file_dict(type_dict, dir_dict): # get size in bytes, convert size to gigabytes old = dir_dict.get("size") new = (old / 1024 / 1024 / 1024) # update size value, add type element dir_dict.update({'size': new}) dir_dict.update(type_dict) # return the results return dir_dict # COMMAND ---------- # # 1 - create logical data lake # # COMMAND ---------- delete_dir("/lake2022") # COMMAND ---------- # MAGIC %sh # MAGIC ls /dbfs/lake2022/bronze # COMMAND ---------- # MAGIC %sh # MAGIC mkdir /dbfs/lake2022 # MAGIC mkdir /dbfs/lake2022/bronze # COMMAND ---------- # MAGIC %fs # MAGIC ls /lake2022/bronze # COMMAND ---------- # MAGIC %fs # MAGIC ls /databricks-datasets/asa/airlines/ # COMMAND ---------- # # get file list (csv) # info = update_file_dict({'type': 'csv'}, get_file_info("/databricks-datasets/asa/airlines/")) print(info) # COMMAND ---------- # # 2 - Read airline (performance) data # # file location path = "/databricks-datasets/asa/airlines/*.csv" # make dataframe df1 = spark.read.format("csv") \ .option("inferSchema", "true") \ .option("header", "true") \ .option("sep", ",") \ .load(path) # make temp hive view df1.createOrReplaceTempView("tmp_airline_data") # show schema df1.printSchema() # COMMAND ---------- # # 3 - Read plane data # # file location path = "/databricks-datasets/asa/planes/*.csv" # make dataframe df2 = spark.read.format("csv") \ .option("inferSchema", "true") \ .option("header", "true") \ .option("sep", ",") \ .load(path) # make temp hive view df2.createOrReplaceTempView("tmp_plane_data") # show schema df2.printSchema() # COMMAND ---------- # MAGIC %fs # MAGIC ls /databricks-datasets/flights/ # COMMAND ---------- # # 4 - Read departure + delay data # # file location path = "/databricks-datasets/flights/departuredelays.csv" # make dataframe df0 = spark.read.format("csv") \ .option("inferSchema", "true") \ .option("header", "true") \ .option("sep", ",") \ .load(path) # make temp hive view df0.createOrReplaceTempView("tmp_flight_delays") # show schema df0.printSchema() # COMMAND ---------- # # 5 - Read airport codes # # file location path = "/databricks-datasets/flights/airport-codes-na.txt" # make dataframe df3 = spark.read.format("csv") \ .option("inferSchema", "true") \ .option("header", "true") \ .option("sep", "\t") \ .load(path) # make temp hive view df3.createOrReplaceTempView("tmp_airport_codes") # show schema df3.printSchema() # COMMAND ---------- # # 6 - Explore datasets with spark SQL # # COMMAND ---------- # MAGIC %sql # MAGIC select * from tmp_flight_delays limit 5 # COMMAND ---------- # MAGIC %sql # MAGIC select # MAGIC Year * 100 + Month as FltHash, # MAGIC Year as FltYear, # MAGIC Month as FltMonth, # MAGIC DayOfMonth as FltDay, # MAGIC DepTime, # MAGIC ArrTime, # MAGIC FlightNum, # MAGIC TailNum, # MAGIC ActualElapsedTime as ElapsedTime, # MAGIC ArrDelay, # MAGIC DepDelay, # MAGIC Origin, # MAGIC Dest, # MAGIC Distance as FltDist # MAGIC from tmp_airline_data # MAGIC where TailNum <> 'NA' # MAGIC limit 5 # COMMAND ---------- # MAGIC %sql # MAGIC select * from tmp_plane_data where type is not null limit 5 # COMMAND ---------- # MAGIC %sql # MAGIC select * from tmp_airport_codes limit 5 # COMMAND ---------- # MAGIC %sql # MAGIC -- Get record counts # MAGIC select 'codes' as label, count(*) as total from tmp_airport_codes # MAGIC union # MAGIC select 'airports' as label, count(*) as total from tmp_airline_data # MAGIC union # MAGIC select 'planes' as label, count(*) as total from tmp_plane_data # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- Find unmatched airport codes # MAGIC select distinct c1.city, c2.city, a.Origin, a.Dest # MAGIC from tmp_airline_data as a # MAGIC left join tmp_airport_codes as c1 on a.Origin = c1.IATA # MAGIC left join tmp_airport_codes as c2 on a.Dest = c2.IATA # MAGIC where c1.city is null or c2.city is null # MAGIC limit 5 # COMMAND ---------- # MAGIC %sql # MAGIC -- Find unmatched airport codes # MAGIC select count(*) as total # MAGIC from tmp_airline_data as a # MAGIC left join tmp_airport_codes as c1 on a.Origin = c1.IATA # MAGIC left join tmp_airport_codes as c2 on a.Dest = c2.IATA # MAGIC where c1.city is null or c2.city is null # COMMAND ---------- # MAGIC %sql # MAGIC -- Bad tail numbers # MAGIC select count(*) as total # MAGIC from tmp_airline_data as a # MAGIC where a.tailnum in ('NA','UNKNOWN','�NKNO�','null') # COMMAND ---------- # MAGIC %sql # MAGIC -- Find unmatched airport codes # MAGIC select count(*) as total # MAGIC from tmp_airline_data as a # MAGIC left join tmp_plane_data as p on a.tailnum = p.tailnum # MAGIC where a.tailnum not in ('NA','UNKNOWN','�NKNO�','null') and p.tailnum is null # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- Find unmatched airport codes (origin) # MAGIC select distinct c.city, a.Origin from tmp_airline_data as a # MAGIC left join tmp_airport_codes as c on a.Origin = c.IATA # MAGIC where c.city is null # MAGIC # MAGIC union # MAGIC # MAGIC -- Find unmatched airport codes (destination) # MAGIC select distinct c.city, a.Dest from tmp_airline_data as a # MAGIC left join tmp_airport_codes as c on a.Dest = c.IATA # MAGIC where c.city is null # COMMAND ---------- # # 7A - re-create database # # drop database sql_stmt = "DROP DATABASE IF EXISTS sparktips CASCADE;" spark.sql(sql_stmt) # drop database sql_stmt = "CREATE DATABASE sparktips;" spark.sql(sql_stmt) # COMMAND ---------- # # 7B - format airline data in df # # create table sql_stmt = """ select Year * 100 + Month as FltHash, Year as FltYear, Month as FltMonth, DayOfMonth as FltDay, DepTime, ArrTime, FlightNum, TailNum, ActualElapsedTime as ElapsedTime, ArrDelay, DepDelay, Origin, Dest, Distance as FltDist from tmp_airline_data """ # grab data frame df4 = spark.sql(sql_stmt) # row count print(df4.count()) # COMMAND ---------- # # 7C - Write airline data (managed hive table) # # drop table sql_stmt = "DROP TABLE IF EXISTS sparktips.mt_airline_data" spark.sql(sql_stmt) # write as managed table df4.write.format("delta").partitionBy("FltHash").saveAsTable("sparktips.mt_airline_data") # COMMAND ---------- # MAGIC %sql # MAGIC describe table extended sparktips.mt_airline_data # COMMAND ---------- # MAGIC %fs # MAGIC ls dbfs:/user/hive/warehouse/sparktips.db/mt_airline_data # COMMAND ---------- # # 8 - Write airplane data (managed hive table) # # drop table sql_stmt = "DROP TABLE IF EXISTS sparktips.mt_airplane_data" spark.sql(sql_stmt) # grab data frame df5 = spark.sql("select * from tmp_plane_data") # write as managed table df5.repartition(1).write.format("delta").saveAsTable("sparktips.mt_airplane_data") # COMMAND ---------- # MAGIC %fs # MAGIC ls dbfs:/user/hive/warehouse/sparktips.db/mt_airplane_data # COMMAND ---------- # # 9 - Write airport data (managed hive table) # # drop table sql_stmt = "DROP TABLE IF EXISTS sparktips.mt_airport_data" spark.sql(sql_stmt) # grab data frame df6 = spark.sql("select * from tmp_airport_codes") # write as managed table df6.repartition(1).write.format("delta").saveAsTable("sparktips.mt_airport_data") # COMMAND ---------- # MAGIC %fs # MAGIC ls dbfs:/user/hive/warehouse/sparktips.db/mt_airport_data # COMMAND ---------- # COMMAND ---------- # # 10 - Create array to hold results # # empty list results = [] # COMMAND ---------- # # 10A1 - Write airline data (arvro) # delete_dir("/lake2022/bronze/avro") df4.write.format("avro").partitionBy("FltHash").save("/lake2022/bronze/avro") # COMMAND ---------- # # 10A2 - get file list (avro) # info = update_file_dict({'type': 'avro'}, get_file_info("/lake2022/bronze/avro")) results.append(info) print(info) # COMMAND ---------- # # 10B1 - Write airline data (orc) # delete_dir("/lake2022/bronze/orc") df4.write.format("orc").partitionBy("FltHash").save("/lake2022/bronze/orc") # COMMAND ---------- # # 10B2 - get file list (orc) # info = update_file_dict({'type': 'orc'}, get_file_info("/lake2022/bronze/orc")) results.append(info) print(info) # COMMAND ---------- # # 10C1 - Write airline data (parquet) # delete_dir("/lake2022/bronze/parquet") df4.write.format("parquet").partitionBy("FltHash").save("/lake2022/bronze/parquet") # COMMAND ---------- # # 10C2 - get file list (parquet) # info = update_file_dict({'type': 'parquet'}, get_file_info("/lake2022/bronze/parquet")) results.append(info) print(info) # COMMAND ---------- # # 10D1 - Write airline data (json) # delete_dir("/lake2022/bronze/json") df4.write.format("json").partitionBy("FltHash").save("/lake2022/bronze/json") # COMMAND ---------- # # 10D2 - get file list (json) # info = update_file_dict({'type': 'json'}, get_file_info("/lake2022/bronze/json")) results.append(info) print(info) # COMMAND ---------- # # 10E1 - Write airline data (csv) # delete_dir("/lake2022/bronze/csv") df4.write.format("csv").partitionBy("FltHash").save("/lake2022/bronze/csv") # COMMAND ---------- # # 10E2 - get file list (csv) # info = update_file_dict({'type': 'csv'}, get_file_info("/lake2022/bronze/csv")) results.append(info) print(info) # COMMAND ---------- # # 10F1 - Write airline data (delta) # delete_dir("/lake2022/bronze/delta") df4.write.format("delta").partitionBy("FltHash").save("/lake2022/bronze/delta") # COMMAND ---------- # # 10F2 - get file list (delta) # info = update_file_dict({'type': 'delta'}, get_file_info("/lake2022/bronze/delta")) results.append(info) print(info) # COMMAND ---------- print(results) # COMMAND ---------- # COMMAND ---------- # # 11 - create hive tables from files # # https://blog.knoldus.com/dynamic-partitioning-in-apache-hive/ # COMMAND ---------- # MAGIC %sql # MAGIC set hive.exec.dynamic.partition=true # MAGIC set hive.exec.dynamic.partition.mode=nonstrict # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- # MAGIC -- Unmanaged avro hive table # MAGIC -- # MAGIC # MAGIC -- Drop existing # MAGIC DROP TABLE IF EXISTS sparktips.umt_avro_airline_data; # MAGIC # MAGIC -- Create new # MAGIC CREATE EXTERNAL TABLE sparktips.umt_avro_airline_data # MAGIC ( # MAGIC FltYear int, # MAGIC FltMonth int, # MAGIC FltDay int, # MAGIC DepTime string, # MAGIC ArrTime string, # MAGIC FlightNum int, # MAGIC TailNum string # MAGIC ) # MAGIC USING AVRO # MAGIC PARTITIONED BY (FltHash int) # MAGIC LOCATION '/lake2022/bronze/avro'; # MAGIC # MAGIC -- Register partitions # MAGIC MSCK REPAIR TABLE sparktips.umt_avro_airline_data; # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- # MAGIC -- Unmanaged orc hive table # MAGIC -- # MAGIC # MAGIC -- Drop existing # MAGIC DROP TABLE IF EXISTS sparktips.umt_orc_airline_data; # MAGIC # MAGIC -- Create new # MAGIC CREATE EXTERNAL TABLE sparktips.umt_orc_airline_data # MAGIC ( # MAGIC FltYear int, # MAGIC FltMonth int, # MAGIC FltDay int, # MAGIC DepTime string, # MAGIC ArrTime string, # MAGIC FlightNum int, # MAGIC TailNum string # MAGIC ) # MAGIC USING ORC # MAGIC PARTITIONED BY (FltHash int) # MAGIC LOCATION '/lake2022/bronze/orc'; # MAGIC # MAGIC -- Register partitions # MAGIC MSCK REPAIR TABLE sparktips.umt_orc_airline_data; # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- # MAGIC -- Unmanaged parquet hive table # MAGIC -- # MAGIC # MAGIC -- Drop existing # MAGIC DROP TABLE IF EXISTS sparktips.umt_parquet_airline_data; # MAGIC # MAGIC -- Create new # MAGIC CREATE EXTERNAL TABLE sparktips.umt_parquet_airline_data # MAGIC ( # MAGIC FltYear int, # MAGIC FltMonth int, # MAGIC FltDay int, # MAGIC DepTime string, # MAGIC ArrTime string, # MAGIC FlightNum int, # MAGIC TailNum string # MAGIC ) # MAGIC USING PARQUET # MAGIC PARTITIONED BY (FltHash int) # MAGIC LOCATION '/lake2022/bronze/parquet'; # MAGIC # MAGIC -- Register partitions # MAGIC MSCK REPAIR TABLE sparktips.umt_parquet_airline_data; # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- # MAGIC -- Unmanaged json hive table # MAGIC -- # MAGIC # MAGIC -- Drop existing # MAGIC DROP TABLE IF EXISTS sparktips.umt_json_airline_data; # MAGIC # MAGIC -- Create new # MAGIC CREATE EXTERNAL TABLE sparktips.umt_json_airline_data # MAGIC ( # MAGIC FltYear int, # MAGIC FltMonth int, # MAGIC FltDay int, # MAGIC DepTime string, # MAGIC ArrTime string, # MAGIC FlightNum int, # MAGIC TailNum string # MAGIC ) # MAGIC USING JSON # MAGIC PARTITIONED BY (FltHash int) # MAGIC LOCATION '/lake2022/bronze/json'; # MAGIC # MAGIC -- Register partitions # MAGIC MSCK REPAIR TABLE sparktips.umt_json_airline_data; # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- # MAGIC -- Unmanaged csv hive table # MAGIC -- # MAGIC # MAGIC -- Drop existing # MAGIC DROP TABLE IF EXISTS sparktips.umt_csv_airline_data; # MAGIC # MAGIC -- Create new # MAGIC CREATE EXTERNAL TABLE sparktips.umt_csv_airline_data # MAGIC ( # MAGIC FltYear int, # MAGIC FltMonth int, # MAGIC FltDay int, # MAGIC DepTime string, # MAGIC ArrTime string, # MAGIC FlightNum int, # MAGIC TailNum string # MAGIC ) # MAGIC USING CSV # MAGIC PARTITIONED BY (FltHash int) # MAGIC LOCATION '/lake2022/bronze/csv'; # MAGIC # MAGIC -- Register partitions # MAGIC MSCK REPAIR TABLE sparktips.umt_csv_airline_data; # COMMAND ---------- # MAGIC %sql # MAGIC # MAGIC -- # MAGIC -- Unmanaged delta hive table # MAGIC -- # MAGIC # MAGIC -- Drop existing # MAGIC DROP TABLE IF EXISTS sparktips.umt_delta_airline_data; # MAGIC # MAGIC -- Create new # MAGIC CREATE EXTERNAL TABLE sparktips.umt_delta_airline_data # MAGIC USING DELTA # MAGIC LOCATION '/lake2022/bronze/delta'; # COMMAND ---------- # COMMAND ---------- # # 12 - count rows by partition # # COMMAND ---------- # MAGIC %sql # MAGIC select count(*) from sparktips.umt_avro_airline_data where flthash = 200001 # COMMAND ---------- # MAGIC %sql # MAGIC select count(*) from sparktips.umt_orc_airline_data where flthash = 200001 # COMMAND ---------- # MAGIC %sql # MAGIC select count(*) from sparktips.umt_parquet_airline_data where flthash = 200001 # COMMAND ---------- # MAGIC %sql # MAGIC select count(*) from sparktips.umt_json_airline_data where flthash = 200001 # COMMAND ---------- # MAGIC %sql # MAGIC select count(*) from sparktips.umt_csv_airline_data where flthash = 200001 # COMMAND ---------- # MAGIC %sql # MAGIC select count(*) from sparktips.umt_delta_airline_data where flthash = 200001 # COMMAND ---------- # MAGIC %sql # MAGIC select count(*) as total from tmp_airline_data where Year = 2000 and Month = 1 # COMMAND ---------- # COMMAND ---------- # MAGIC %sql # MAGIC select TailNum, count(*) as Total from sparktips.mt_airline_data group by TailNum order by Total desc limit 10 # COMMAND ---------- # # 13 - flight for 1 month # # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_avro_airline_data # MAGIC where flthash = 200001 and TailNum is not null # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_csv_airline_data # MAGIC where flthash = 200001 and TailNum is not null # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_orc_airline_data # MAGIC where flthash = 200001 and TailNum is not null # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_json_airline_data # MAGIC where flthash = 200001 and TailNum is not null # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_delta_airline_data # MAGIC where flthash = 200001 and TailNum is not null # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_parquet_airline_data # MAGIC where flthash = 200001 and TailNum is not null # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select Year, Month, TailNum, count(*) as Total # MAGIC from tmp_airline_data # MAGIC where Year = 2000 and Month = 1 and TailNum is not null # MAGIC group by Year, Month, TailNum # COMMAND ---------- # COMMAND ---------- # # 14 - flight by month for 3 planes # # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.mt_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_avro_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_orc_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_parquet_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_json_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_csv_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by FltHash, TailNum # COMMAND ---------- # MAGIC %sql # MAGIC select FltHash, TailNum, count(*) as Total # MAGIC from sparktips.umt_delta_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by FltHash, TailNum # COMMAND ---------- # COMMAND ---------- # MAGIC %sql # MAGIC select Year, Month, TailNum, count(*) as Total # MAGIC from tmp_airline_data # MAGIC where TailNum in ('N528', 'N526', 'N525') # MAGIC group by Year, Month, TailNum # COMMAND ----------