Learn more about SQL Server tools

mssqltips logo
 

Tutorials          DBA          Dev          BI          Career          Categories          Webcasts          Scripts          Today's Tip          Join

Tutorials      DBA      Dev      BI      Categories      Webcasts

DBA    Dev    BI    Categories

 

Read, Enrich and Transform Data with AWS Glue Service


By:   |   Last Updated: 2019-03-14   |   Comments (2)   |   Related Tips: 1 | 2 | More > Amazon AWS

Problem

In the first part of this tip series we looked at how to map and view JSON files with the Glue Data Catalog. In this second part, we will look at how to read, enrich and transform the data using an AWS Glue job.

Solution

Please read the first tip about mapping and viewing JSON files in the Glue Data Catalog: Import JSON files to AWS RDS SQL Server database using Glue service.

In this part, we will look at how to read, enrich and transform the data using an AWS Glue job.

Read, Enrich and Transform Data with AWS Glue Service

In this part, we will create an AWS Glue job that uses an S3 bucket as a source and AWS SQL Server RDS database as a target. We will use a JSON lookup file to enrich our data during the AWS Glue transformation. The job will use the job bookmarking feature to move every new file that lands in the S3 source bucket.

Let's start the job wizard and configure the job properties:

confgure job properties

We will enter the job name, IAM role that has permissions to the s3 buckets and to our AWS RDS database. I chose Python as the ETL language.

If you want to track processed files and move only new ones, make sure to enable job bookmark. Note that job bookmarking will identify only new files and will not reload modified files.

Advanced Properties for AWS Glue Job

Bookmarks are maintained per job. If you drop the job, the bookmark will be deleted and new jobs will start processing all files in the bucket.

When you want to run a job manually, you will be prompted whether you want to keep Job Bookmark enabled or disable it and process all files.

In job parameters you can change concurrent DPUs per job execution to impact how fast the job will run, define how many concurrent threads of this job you want to execute, job timeout and many other settings. In our example I haven't changed any of those parameters.

Add a SQL Server destination connection (Read Serverless ETL using AWS Glue for RDS databases for a step by step tutorial on how to add a JDBC database connection) and S3 source connection we will create in our script

aws connections

After you press "save job and edit script” you will be taken to the Python script shell.

Here is a script that will support our requirements. Let's follow line by line:

Create dynamic frame from Glue catalog datalakedb, table aws_glue_maria - this table was built over the S3 bucket (remember part 1 of this tip).

flights_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "aws_glue_maria", transformation_ctx = "datasource0")

The file looks as follows:

json file contents

Create another dynamic frame from another table, carriers_json, in the Glue Data Catalog - the lookup file is located on S3. Use the same steps as in part 1 to add more tables/lookups to the Glue Data Catalog. I will use this file to enrich our dataset. The file looks as follows:

json file contents
carriers_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "carriers_json", transformation_ctx = "datasource1")

I will join two datasets using the Join.apply operator (dataframe1,dataframe2,joinColumn1DataFrame1,JoinColumn2dataframe2):

joined_data = Join.apply(flights_data,carriers_data,'carrier','carrier')

Build mapping of the columns. I want both carrier column and new column carrier_name from the lookup file:

applymapping1 = ApplyMapping.apply(frame = joined_data, mappings = [("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int"), ("dep_time", "int", "dep_time", "int"), ("dep_delay", "int", "dep_delay", "int"), ("arr_time", "int", "arr_time", "int"), ("arr_delay", "int", "arr_delay", "int"),("carrier", "string", "carrier", "string"), ("carrier_name", "string", "carrier_name", "string"), ("tailnum", "string", "tailnum", "string"), ("flight", "int", "flight", "int"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("air_time", "int", "air_time", "int"), ("distance", "int", "distance", "int"), ("hour", "int", "hour", "int"), ("minute", "int", "minute", "int"), ("part_col", "string", "part_col", "string")], transformation_ctx = "applymapping1")

Create the output DataFrame and the target table name - it will be created if it does not exist.

datalake_dest = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "TestDataLakeDB", connection_options = {"dbtable": "flights_enriched", "database": "datalakedb"}, transformation_ctx = "datalake_dest")

Here is a full script the Glue job will execute.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
 
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
 
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
 
flights_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "aws_glue_maria", transformation_ctx = "datasource0")
carriers_data = glueContext.create_dynamic_frame.from_catalog(database = "datalakedb", table_name = "carriers_json", transformation_ctx = "datasource1")
joined_data = Join.apply(flights_data,carriers_data,'carrier','carrier')
 
applymapping1 = ApplyMapping.apply(frame = joined_data, mappings = [("year", "int", "year", "int"), ("month", "int", "month", "int"), ("day", "int", "day", "int"), ("dep_time", "int", "dep_time", "int"), ("dep_delay", "int", "dep_delay", "int"), ("arr_time", "int", "arr_time", "int"), ("arr_delay", "int", "arr_delay", "int"),("carrier", "string", "carrier", "string"), ("carrier_name", "string", "carrier_name", "string"), ("tailnum", "string", "tailnum", "string"), ("flight", "int", "flight", "int"), ("origin", "string", "origin", "string"), ("dest", "string", "dest", "string"), ("air_time", "int", "air_time", "int"), ("distance", "int", "distance", "int"), ("hour", "int", "hour", "int"), ("minute", "int", "minute", "int"), ("part_col", "string", "part_col", "string")], transformation_ctx = "applymapping1")
 
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
 
datalake_dest = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "TestDataLakeDB", connection_options = {"dbtable": "flights_enriched", "database": "datalakedb"}, transformation_ctx = "datalake_dest")
job.commit()

Save the script and run the job.

Here is the result. Here is how my JSON flights data looks:

json file contents

And here is how the carrier's lookup file looks:

json file contents

And here is the result table data in the SQL Server table after the job has finished execution:

data in table
Next Steps


Last Updated: 2019-03-14


get scripts

next tip button



About the author
MSSQLTips author Maria Zakourdaev Maria Zakourdaev has been working with SQL Server for more than 15 years. She is also managing other database technologies such as MySQL, Postgresql, Redis, RedShift, CouchBase and ElasticSearch.

View all my tips




Post a comment or let the author know this tip helped.

All comments are reviewed, so stay on subject or we may delete your comment. Note: your email address is not published. Required fields are marked with an asterisk (*).

*Name    *Email    Email me updates 


Signup for our newsletter
 I agree by submitting my data to receive communications, account updates and/or special offers about SQL Server from MSSQLTips and/or its Sponsors. I have read the privacy statement and understand I may unsubscribe at any time.



    



Wednesday, May 29, 2019 - 2:27:44 PM - Jason H Back To Top

re: "Can I import data from S3 to SQL in EC2 instance (not RDS) "

Yes, any supported JDBC sources that are accessible from your AWS VPC are a supported destination.  You can setup a AWS Glue database connection from within the Glue console (or API) and entering in your DB details for use within your Glue ETL jobs.


Friday, April 12, 2019 - 10:48:35 AM - Umit Kavala Back To Top

Can I import data from S3 to SQL in EC2 instance (not RDS) 


Learn more about SQL Server tools