This article is part one in a series titled "Building Data Pipelines with Python".
Storage is cheap and easy, so data is everywhere.
But while storage is accessible, organizing it can be challenging, and analysis/consumption cannot begin until data is aggregated and massaged into compatible formats. These challenges grow more difficult as your dataset increases and as your needs approach the fabled "real time" status.
In this series, we'll talk about how Python can be leveraged to collect data that is organized from many sources, standardized for analysis and consumption, and parallelized to scale with volume.
There is no shortage of varying ways to move data from one system to another. It's a problem almost as old as computing itself. And while there are a number of ideologies, some more formalized than others, regarding the best practices for solving this problem, we're going to focus on what is likely the most documented and applied one: the process of ETL.
Let's jump right into designing an ETL workflow.
What is ETL?
- ETL stands for Extract (or Expand), Transform, Load
- Extraction, Transformation, and Loading should be isolated, independent processes. As such, after it process completes data needs to be stored in an intermediate storage format.
- Extraction is the process by which data is retrieved from source files/warehouses.
- Transformation is where all calculations, type conversions, etc are performed.
- Loading is the process by which transformed data is sent to output, or target files/warehouses.
Let's take a look at each step of the ETL workflow and discuss how they should be designed.
Extract / Expand
The first step to creating an extraction process is identifying the various formats your source data is in. For small pipelines, this might be a single source format (a proprietary Oracle database, for example). That said in larger pipelines, it's not at all uncommon for source data to be warehoused in a disparate set of formats (Excel spreadsheets, RDBs, or maybe even exposed for collection through an HTTP API).
Identifying sources up front helps you to understand how flexible your extraction process should be. If there's a good chance data will be arriving in new formats in the future, think about how some abstraction can be leveraged to create interfaces between outside data formats and your transform process.
The goal of the extract process is to aggregate incoming data into a common warehouse, or intermediate storage, for the rest of ETL to access. Again, this is not the place to perform conversions or calculations--you're simply pulling together source data in a consistent format.
Potential Formats for Intermediate Storage
JSON is an excellent representation of extracted data because JSON does not store much information about datatypes (the exception being strings, which are quoted in JSON syntax). This is ideal because it obligates the upcoming transform process to make deterministic decisions about the output format of your data.
CSV is another common option for your extracted data. One major reason to selecting CSV as a storage format for extracted data is the way it will be consumed during the transform process. Pandas has excellent support for loading CSV files into it's Dataframe format, and if transformation is going to involve performing aggregations and calculations across broad cross sections of an extracted dataset, the ability to leverage these Dataframes during transformation is going to be crucial.
Finally, a more traditional relational database can be used to store extracted data. This is somewhat less ideal in that you will have to force a schema on the extracted data. I would advice storing all of the data as varchar fields or something similar to emulate the datatype-agnostic aspect of the above formats. On the other hand, if the majority of your source data is in an RDB format, this can be an advantage as your Extract process can be a thin Python wrapper around any number of easily configurable database link applications. (oracle_fdw has been an absolute life saver for us at Lofty Labs to get Oracle data into Postgres).
Transformation is where you'll spend most of your development and maintenance time in an ETL process. There are many types of operations that might need to be performed when transforming, most of it being very specific to the actual domain of your data.
Datatype conversion is the baseline operation happening here, but other common operations include:
- Unit conversion (metric to English, etc)
- Derived calculations (operating on one or more source attributes to derive a new one)
- Hydrating data via external resources (Geocoding against an API, etc)
No-one can tell you how best to transform your data, as it is specific to your business cases. That said, there are some good guidelines to follow.
Transform on a Per-Item Basis
The code you write for transformation should operate on a per-item basis. That is, a transformer accepts as input a single extracted item one returns as output a single transformed item. Because transformation is likely to be the most computationally expensive part of your ETL workflow, it stands to benefit the most from scaling and parallelization. Especially consider any external data hydration your dataset needs--if it takes no less than 2 seconds to geocode an address in your dataset, you are fundamentally bound to a minimum transformation throughput of (2 * number of transformation threads) transforms/s, and that's in a hypothetical vacuum where you've optimized the rest of your transformation process to a negligible CPU load.
The good news is that CPUs are cheap these days, and building your transformation process for parallelization makes it easy to throw threads at high computation / high wait processes to increase throughput.
Intermediate storage for transformed data requires a more sophisticated storage engine than on the extraction side. This is because our data that has been transformed is now opinionated regarding datatype. The biggest influence on your intermediate storage of transformed data is the final format of your data pipeline. Taking advantage of the pickle module can be an advantage here. Your pickled objects can then be stored in any number of ways (Redis, memcached, etc) depending on how much storage durability you're looking for, and unpickled back to native Python types by the load process.
The Load process of ETL is the mechanism by which our transformed data is inserted into the target data stores. Like with the Extract process, a simple ETL pipeline might have a single output (say, a relational database for a web application), but more complicated pipelines could have many. For example, if your pipeline is processing large volumes of compressed video and tabular metadata about those videos, you might have two output streams: a filesystem where the video data lands and a database where the metadata is stored.
One common concern of ETL Load processes is that of idempotence. Idempotence is a property of certain algorithms that they can be applied multiple times yielding the same result.
In data pipelines, an idempotent Load process might be desired if you expect:
- Duplicate data might be pushed into the pipeline
- Updated data might be pushed into the pipeline
Again, this is specific to your use case but one of those two situations is very likely to arise in almost every application. Because Extract, Transform, and Load have been developed as standalone processes you have to account for the possibility that one of those processes fails, and you want to be able to run ETL again against the same data if that happens without creating duplication in your target data stores.
In the case of updated data, building idempotence into your Load process is a handy way to prevent de-duplication without any structural change from your source data formats. So long as you have a way of distinguishing when data has been updated, your ETL workflow can only extract new data from your sources (last modified time in the past 24 hours, for example), re-run transformations and update existing records rather than creating new ones.
Relationship Discovery is a process that is relatively isolated to target data stores which are relational databases. A major drawback of RDBs for ETL pipelines is that you cannot create relationships between rows until both rows exist and have been assigned a primary key. This problem is compounded by the concept of parallelized Transformations, especially in a real-time or near-real-time data pipeline. Take a situation where Record A has a foreign key relationship with Record B: There is no way to know whether or not Record B has gone thorough the load process yet when Record A is created.
There are two main approaches to Relationship Discovery:
- Stub out dummy related records during the load process, and rely on loader idempotence to clean up the mess
- Run relationship discovery as a periodic or synchronous process to connect items that should be related
Both approaches have advantages and disadvantages.
If the load process stubs out dummy relations, this requires that your target database schema must allow for most of the data on related records to be nullable. This way a row can be created with a single non-null column that contains enough data for the loader to find and update the related record after it is transformed, but leaves your database with a weak structure that allows data integrity problems.
Running relationship discovery as a periodic process is potentially troublesome as well, since it leaves relationships nullified in your target datastore until the process is run. And, even then, the periodic process will only create the relation if both related items have completed transformation and loading. The severity of this problem is determined by whatever throughput constraints (performance or otherwise) that dictate how often relationship discovery can run and how sparse your pipeline is (running relationship discovery every 5 minutes doesn't help much if it takes an hour after Record A is loaded for Record B to get loaded).
For what it's worth, my preference leans towards the latter option, as I'd rather see a foreign key be null for an hour than a required column be allowed null for a lifetime at the database level.
This should be enough to get you started on drafting your workflow for a data pipeline. There is a large amount of planning and architecture that goes into a large data pipeline, and it is mostly high-level conceptualization of how your data needs to flow from source formats to target formats. As such, we did not cover any actual code examples here--only conceptual models.
In part two of this series, we'll start looking at Python implementations of this process, complete with sample code that can be used to begin generating your own ETL pipelines. Stay tuned!