In this guide, you’ll use Anypoint Platform to build a polling service that:
Anypoint Platform includes Mule runtime, a flexible execution engine that can be used to implement integration patterns––from APIs to a traditional Enterprise Service Bus (ESB), which offers a variety of benefits when developing batch and ETL services, including the ability to:
The Anypoint Exchange also offers many pre-built examples and templates for popular systems such as SAP and Salesforce.
First, click the button below to signup for a free Anypoint Platform account.
Already have an account? Sign in.
Next, download and install Anypoint Studio by clicking the link. Next, Sign up for a Salesforce Developer account. Take a note of your credentials, we will need that later. Lastly, reset your security token for Salesforce. You will get an email with your token. You will also need this in a later step.
To begin, launch Anypoint Studio and follow the steps below to:
Alternatively, if you are just looking for a pre-built example of a batch ETL integration with Salesforce, you can import the soulution file to Anypoint Studio. To do so:
Unlike a service that needs to be exposed as an API – as shown in the second quick start guide on developing your first Mule application – this project’s application runs strictly as a scheduled nightly task. As a result, you’ll create a new Mule project without using an API specification.
From within Studio:
To detect if files in a FTP location have been updated or created in a specific path, there is a connector available that polls for new or updated files on a FTP server. To add the FTP connector to your application canvas:
This creates the main flow of your application. It is triggered by any new or updated file that matches all the settings and conditions set in future steps.
As with most connectors, after adding it to the canvas, the first step is to configure the connector before configuring the operation.
The directory is at the root, so there is no need to change the path.
In this case, you’ll have the FTP connector’s On New or Updated File operation, which will look for a specific file, enable watermarking, and for development purposes, set it to run every minute. Since this account is read-only, you won’t be able to write or delete, so post-processing is false. After processing a file, for many use cases, it is a good idea to move the file to a different directory to avoid duplication should you need to restart the service.
Enabling watermarking automatically keeps track of the timestamp the file was updated and/or created within the in memory object store to ensure that only files newer than the last fetch continues the message processes.
Fill in or select the following to configure the operation:
In order to make data transformations easier, you can set up metadata information to define the data structure of the file. This can then be used by Data Sense to help create visual mappings of the data. Anypoint Studio can derive the metadata by importing an example file. To create data structure metadata using a example file:
Since the file format you’ve fetched is in CSV format, you’ll need to convert it to a Plain Old Java Object (POJO) in order allow the Java-based connectors to use the data. To do so, you’ll use the Transform Message component, which uses powerful DataWeave syntax to transform data.
To add the transform message component, follow similar steps as the FTP connector:
1
2
3
4
5
6
%dw 2.0
output application/java // Output mime-type as POJO
---
// Convert entire payload variable which currently
// contains the CSV from the FTP connector
payload
Adding a batch job and setting the processing rules for records in Anypoint Studio is a simple process. Adding data aggregation to the batch process is also supported to easily enable bulk uploads at customizable record sizes.
Add the batch job to your main flow by:
With the batch job placed in the flow, you now have the option of configuring the scheduling strategy, number of records to process per thread, and more. To see the process in action, change the following:
Block size needs considerations in place. An example is when processing large data records (e.g. images), it is possible to run out of memory (40 records X 10MB x 16 threads = 6.4GB of RAM), or to have the block size too high for only a few records, causing inefficient or synchronous data processing.
In order to create a bulk upsert of these customer records to Salesforce, you’ll need to aggregate a subset of the batch records in the queue. To add the Batch Aggregator scope:
The aggregator includes a subset of the batch block size. Due to that constraint, it should be set to equal or less than the total batch block size. For purposes of this demo, configure it to the following:
In order to save on API calls to Salesforce and avoid the lengthy process of trying to insert millions of records one at a time, Salesforce has a bulk upsert operation that is built-in to the Salesforce connector. To add and configure the bulk upsert:
Configure the bulk upsert by doing the following:
The #[ ] syntax allows to you to place DataWeave code inside the field to dynamically populate inputs. In this scenario, we are upserting the payload as set by the batch aggregator (an array of customers).
The batch step is where each record from the batch job gets any processing that might need to happen. You can include multiple chained batch steps for each of the records. For this example, there’s only one batch step where you’ll transform the CSV data structure to a Salesforce contact object:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
%dw 2.0
output application/java
---
{
LastName: payload."Last Name",
FirstName: payload."First Name",
Salutation: payload."Salutation",
MailingStreet: payload."Mailing Street",
MailingCity: payload."Mailing City",
MailingState: payload."Mailing State/Province",
MailingPostalCode: payload."Mailing Zip/Postal Code",
MailingCountry: payload."Mailing Country",
Phone: payload."Phone",
Fax: payload."Fax",
MobilePhone: payload."Mobile",
Email: payload."Email",
Title: payload."Title"
}
The DataWeave code above is mostly declarative, with the exception of the “Name” parameter, which uses the concatenation operator to combine strings together, as well as the selector modifier to throw an exception if fields are null. You can visually see the mappings via the Data Sense view.
To see your application run, simply:
From within the console view that loads up, you’ll see the application run the entire batch operation the first time, then it will bypass the batch every time afterwards due to the watermark since the file hasn’t changed every subsequent minute.
To see if your batch process ran successfully, you can log into your Salesforce account and check Bulk Data Load Jobs section from the setup view. Here is how you’d do it:
Your app is now running on in your local machine and checks every minute. You can modify the On New or Updated File operation of your FTP connector to run at a specified cron so it performs the operation every night at 1AM before sending the list of new contacts as a batch operation to Salesforce!
If your application is ready to be deployed to CloudHub, which you can quickly and easily from within Studio ›
Start your 30-day free trial of the #1 platform for integration, APIs, and automation. No credit card required. No software to install.
Questions? Ask an expert.