Is this your first time developing with MuleSoft? If so, please read our Hello Mule tutorial to learn how to build your first API/application in Anypoint Studio and deploy it to CloudHub.
With new advances in technology driving down the cost of storing and analyzing historical data, businesses are increasingly using the information to compile custom reports and dashboards in BI tools. As larger and more complex data sets are involved, data warehousing coupled with simple ETL/ELT integration patterns have grown in popularity as a way to extract insights from these large data sets. Mulesoft offers a simple and flexible way to implement batch and ETL/ELT services. This tutorial will guide you through the basics of setting up a Mule flow that extracts data from a CSV file and transforms it before bulk uploading to Snowflake.
This tutorial will guide you through the process for building a Batch ETL Mule Flow for Snowflake in Anypoint Studio. You can also download the pre-build Flow from Anypoint Exchange here.
Data warehousing is the secure and organized electronic storage of data, usually in large sizes, maintained by an organization for future reference. Common data warehouses include Amazon Redshift, Snowflake, Google BigQuery, IBM Db2, and Microsoft Azure Synapse among many others. Data warehousing serves 3 main functions: analytics to gain insight into a company’s operations, long term data storage for future reference (such as customer transaction records and receipts), and compliance with financial and data security requirements.
Traditionally, large data sets have been consumed using the ETL approach, which stands for “Extract, Transform, Load”. Data is extracted from a data source, transformed into the necessary format (such as a table with only specific values rather than the full data set) on a staging environment, and then loaded into a data warehouse for further dashboarding/analytics and visualization. ETL is useful when frequent updates to data are not needed, and users would like to save on computing costs by transforming small amounts of data in a staging server before pushing to the target. ETL is also most common when working with on-prem data storage solutions.
As cloud computing becomes more affordable, however, a new trend known as ELT is emerging. This stands for “Extract, Load, Transform” and allows users to directly transfer data from the source data warehouse to the destination tool before computing the transformations needed. This is most common with large amounts of data that may need frequent updates. You can learn more about ETL/ELT integration patterns in the following Mulesoft Whitepaper.
To get started, click the buttons below to create a free MuleSoft Anypoint Platform account and download Anypoint Studio.
Already have an account? Sign in.
Next download Anypoint Studio for FREE by clicking here
Now that you have set up your Anypoint Platform account and installed Anypoint Studio, we are ready to begin! The ETL process we will implement today creates a solution for the following scenario:
Danny works for Northern Trail Outfitters (NTO), a retailer of sports apparel, gear, and nutrition products. NTO has decided to send all of their customers a free t-shirt! NTO’s list of customers along with their shirt sizes currently resides in a CSV file, and the prices for each shirt size reside in a Snowflake Database. Danny has been tasked with creating a new table in Snowflake that contains the list of customers, their shirt sizes, and the price for each shirt so that management can get an idea of the total cost of this giveaway. But there is a catch! NTO is sold out of the 3XL size, so NTO notified and checked with their customers ahead of time and now any customer that is listed as a 3XL will need to be sent a 2XL shirt instead.
Let’s help Danny build an integration that extracts the list of customers and their shirt sizes from the CSV file, extracts the shirt prices from the Snowflake database, transforms 3XL sizes to 2XL, transforms the data to add shirt prices, combines the results into a single table, and loads the data into a new Snowflake database.
First we must configure Snowflake with a new warehouse and database that we will work with. This will contain the table of shirt price information, and the target table for your final data.
You have now created 2 tables in Snowflake, one called “customer_shirts” that will receive the processed data from the flow, and one called “shirt_prices” that looks like this:
Your Snowflake setup should now look like this:
We have completed the setup of Snowflake. The tutorial below will guide you through developing the Mule flow that will integrate with it. If you simply wish to try using the developed flow, you can visit Anypoint Exchange and search for “Snowflake Demo 2021” to download the completed flow file.
In the following steps, we will:
Unlike a service that needs to be exposed as an API, this project’s application runs strictly as an on-demand flow that transforms and uploads data from the CSV. As a result, you’ll create a new Mule project without using an API specification.
Open Anypoint Studio, and from within Studio
To detect if files in an SFTP location have been created or updated in a specific path, there is a connector available that polls for new or updated files on an SFTP server. To add the SFTP connector to your application canvas:
You have now set up the SFTP connector to check for new/updated files on the server once every 5 minutes, and read the data from any file beginning with “new_customer_shirts_small”. In this case the sample file that is already on the SFTP server is called “new_customer_shirts_small_07Jul2021.csv”. This will be passed into the flow as part of the payload.
Next we will convert the data extracted from the CSV to a JAVA object (POJO). This makes it easy for us to reference later during the flow.
The above will update the names of each of the columns from the CSV to a consistent format and convert the data set into a Java object.
This section demonstrates how Anypoint Platform can read from a Snowflake database to supply data to the Mule flow we are developing.
In order to provide Danny with the pricing of each shirt size, we must import the price table from Snowflake into the Mule flow:
3. From the Snowflake connector now in your Palette, drag and drop the “Select” action onto the canvas and place it inside the process after the “Transform Message” that you set up earlier.
4. You should now see a configuration pane for the Select action at the bottom of your Studio screen. You can configure it with the following settings:
You have now configured the Snowflake connector to retrieve the list of shirt prices and store it in the variable called shirt_prices_raw. Next we need to flatten the array that was returned by the select function to correctly map the key:value pairs as “shirt_size:shirt_price” as shown in the example below:
5. From the Mule palette, insert a “Set Variable” function after the Snowflake Select action on your canvas. In the configuration pane, name the variable “shirt_prices_flattened”, and select the function key to enable DataWeave programming for the variable value.
6. In the configuration pane for “Set Variable”, enter the following DataWeave code:
shirt_prices_flattened now contains the shirt price values from Snowflake, organized as key:value pairs “shirt_size:shirt_price”. Next we can use this data in our batch processing to create the final list of customers with their shirt sizes and prices.
In order to efficiently process the large amount of data row-by-row and append the correct shirt size and price to each entry, we must use a batch job with various data transformations.
A batch job consists of a process section (the steps the job should take with the data provided), and an on-complete section (what the job should do when it has finished processing all of the data).
The process section can include multiple batch steps, each of which can have its own set of processors (the actions the batch step should carry out) or its own aggregator (where processed entries are collected before moving onto the next step, such as uploading to Snowflake).
First let’s set up our batch job and configure the first batch step to update the ‘3XL’ shirt sizes to ‘2XL’:
Next, let’s configure our batch job to append shirt prices to each row in the data set:
4. Search the Mule palette for a “Batch Step” action, and insert another one after the existing batch step. Rename it to “Append_shirtPrice_to_Payload”.
5. Insert a Transform Message into the processor for this batch step, and configure it with the following DataWeave code:
Your flow should now look something like this:
Lastly we must aggregate the updated records and bulk upload them to Snowflake into the table that we created earlier (“customer_shirts”).
Now search the Mule palette for the Snowflake “Bulk Insert” action, and drag it into the Batch Aggregator. Ensure that the “input parameters” field says payload, and then insert the following query in to the “SQL Query Parameters” field:
INSERT INTO customer_shirts(id,lastname,firstname,email,company,shirtsize,shirtprice)
5. Lastly, we want the flow to tell us when it has finished processing. Search the Mule palette for a “Logger” action, and insert it into the On Complete section of the Batch Job.
Your flow should look like this:
Your application is now ready to deploy and users can begin to query the data from Snowflake for all their data warehousing use cases. This tutorial is available in video format, and more information on our Snowflake connector can be found on Anypoint Exchange.