We all know that data, like wine and cheese, becomes more valuable when combined. And, just like wine and cheese, they can lead to serious headaches. Whether you are emailing Excel files around, capturing data from thousands of IoT-devices, or just joining your Google Analytics and sales data, you can benefit from following a structured process to minimize your headaches. After yet another pipeline debugging session I’ve come with the following 8+3 model of 8 steps and 3 focus areas.

I will give you the TL;DR straight away before we go into the details. If you want to minimize future headaches, you should cover the following 8 steps for every data ingestion process.

  • Trigger: How and when does the pipeline start?
  • Connection: What are the requirements for connecting the source system or database to the target or destination?
  • State management: How do you keep track of what has and what has not changed since your last ingestion? How do you handle changes to the data over time?
  • Data Extraction: What is the actual process of extracting data? Do you query a database, access an API, or just read files?
  • Transformations: Is there any filtering, cleaning, reshaping, deduplication, or similar process that you need to apply to be able to move your data from the source to the destination?
  • Validation and Data Quality: How sure do you want to be that the data is correct? What kind of guarantees do you want to enforce and what kind of anomalies are you willing to accept?
  • Loading: Where do you store the data that you captured? How do you handle connecting and adjusting to your destination system?
  • Archiving and retention: How long should you store or keep the data you just captured? How do you handle expired data?

In addition to these steps, there are three big overarching areas that are relevant for the entire pipeline.

  • Monitoring and alerting: How do you make sure you are aware and able to debug when there is an issue anywhere in the ingestion process?
  • Governance: What kind of agreements do you have with both the source data provider as well as the destination owner? Are there any legal requirements you need to consider when handling this data?
  • Scalability and resilience: As the data grows over time, are you still able to handle the additional load? What is the fault tolerance of the entire pipeline and how often does it retry before giving up?

The source of all problems

The reason a data ingestion pipeline even needs to exist starts at the way source data is collected. Let’s for a moment consider ourselves an analyst in need of source data. What type of data do you need? Sales data, web analytics data, genetic data, IoT-device data, CRM data? Many organisations will use similar types of data for analysis. And what kind of questions do you want to answer about this data? Maybe compare time-periods, or calculate a sum or an average or a ratio between two metrics? Maybe you want to understand trends or anomalies. In any case it is unlikely that you care about one single observation. It is more likely that you care about combinations or aggregations of observations.

Now put yourselves in the shoes of the developer or product owner who is responsible for the application that generates the source data. You might be working on a web shop, a CRM system, a SaaS application, or a DNA sequencing system. What is the most important thing for you? It is making sure that nothing gets lost: no transaction dropped, no relation missed or incomplete sequence. As a generator of source data, you care about reliability and quality of your application for the users of that application, but as an analyst you care about answering questions from business stakeholders about the usage of that application.

In other words, the stakeholders of the source system care about writing data reliably and fast, while the stakeholders of the destination system care about reading data in high volumes with acceptable latency. This discrepancy is what causes a distinct difference in design between source and destination systems. That is why usually your source data will be stored in a row-based, normalised, write-first database, while your analytics data benefits from being stored in a columnar, de-normalised, read-first database.

The 8 Steps of a Data Ingestion Pipeline

It goes without saying that every data ingestion pipeline is a unique snowflake that is special to your organisation and requirements. Nevertheless, every pipeline shares a few common characteristics that are essential to setting up a long-term process of moving data from point A to B. Sometimes that process can be as simple as a few clicks in a nice looking interface, but other times, especially when the origin of the data is unique to your organisation, you will have to create a custom pipeline or integration and take care of the additional complexity that it ships with. Even in the so-called “happy flow” of adding a new source through a user interface, you will have to think about most of the steps in some way or the other.

So, here are the 8 steps you should consider when setting up a data ingestion pipeline.

Trigger

The first step in any pipeline is to decide when your pipeline should run, or rather make sure that the pipeline runs on the time or event that you have decided on. Even though this might seem simple, there are actually many different types of triggers. Very often a schedule is the simplest way of triggering a pipeline. It can be as simple as running a job every day at midnight, or every hour of the day. In cron terms a daily midnight job would be 0 0 ** ** **, meaning the job runs on minute 0 of hour 0 on every day of the month, every month of the year, Sunday to Saturday. If you’d want it to run every 13th minute of every 3rd hour during business hours Monday-Friday it would be 13 9-17/3 ** ** 1-5.

For all it’s simplicity the cron job can get very complex when you have many different jobs running at the same or similar times. Especially when these jobs have dependencies on each other and the duration on the job might vary depending on the underlying data it is processing. There are many times when staggering cron jobs works totally fine, but sometimes you will need to handle the dependencies between two systems and only start the next job when the first one has finished. A good, open source, and very often used scheduling tool that can handle this type of dependency is Apache Airflow. It allows you to trigger jobs on a schedule and dependent on whether other jobs have finished, as well as handle additional complexity like dynamically generating jobs from lists of data, like for example a list of tables in a data base. In essence though, what a tool like Airflow does different from a simple schedule, is run jobs based on events.

An event trigger is essentially a simple message that starts a job. It can be a message that a previous job has finished, but there are many other relevant messages out there that can be used as triggers. Some commonly used events are:

  • A new file that has been added to a folder
  • A new table that has been added to a database
  • An HTTP request to an endpoint (webhook)
  • Specific log or audit events like access or permissions for users or service accounts
  • Billing or consumption thresholds that have been reached
  • A queue has more than a specified amount of items in it
  • A branch from your version control system has been changed

These types of events are often push-based, meaning that there will be a push from another system signaling a change in that system. However, not all systems have these capabilities. If a system doesn’t have this capability you can also implement a pull functionality. This is basically a sensor in your scheduling tool that checks whether an event, like the adding of rows to a table, has occurred and act as a trigger if it has.

If all of this sounds still too complex for you job, you can of course always run your pipeline manually. This is generally a good practice to do as during development, or after certain failures or unforeseen circumstances, you might be asked to run or re-run a pipeline job manually for a specific date or time.

Connection

In its simplest form a connection for a data ingestion pipeline is nothing more than retrieving a publicly available file or URL. In some cases that might be enough, for example when you are copying the latest currency exchange rates from the European Central Bank (ECB). But unless you are copying publicly available data, transfering data from one system to another usually means you will need to authenticate in some way or another. This allows the source system to check the permissions and access level of the retrieving party. Authentication is often the first hurdle in connecting to any source system and also a very common failure point of any pipeline. It could be that for example permissions were (wrongfully) revoked or a personal user account was used and that user left the company, or even an additional step like multi-factor authentication (MFA) was added to the source system. With hacks and security failure being near-daily news, it is understandable that owners of source systems are careful with permissions and access, but unfortunately that does mean more complexity for our pipeline. Some of the point and click ETL tools make it easy to for example authenticate a user to a let’s say a Meta, Twitter or Google Business account, but if you want to prevent the scenario mentioned above where a user leaves the company and your pipeline fails, it is usually a better practice to use a service account. This is usually an account similar to a normal user account, but intended solely for service-to-service communication without the intervention (and potential security and business risks) of an individual user. Of course, even a service account still requires some sort of credential to authenticate and that credential will have to be stored in a certain place. That place will most certainly not be your source code, because a key that gives access to a treasure trove of data is always a valuable item that should be treated with care. Preferably this credential is then stored either in a credential manager from your favourite cloud provider, or otherwise in environment variables that are specific to the server or environment that is retrieving the data from the source system. Having the credentials accessible to your code through a credential manager or environment variable, you should now be able to actually connect to the source system. Some systems provide higher level libraries for languages like Python, C or Java to make connecting a bit more convenient, but under the hood there are usually very similar patterns across source systems. We won’t go into the details of different types of connections between systems, but here are a few common patterns.

  • HTTP GET or POST requests with the authentication token in the header that allows you to retrieve a response with a data object based on the filters or queries you provided in the request
  • An ODBC connection allows for a standardised connection specifically for databases —Which is why ODBC stands for Open Database Connectivity. With ODBC you can query pretty much any database in a similar way to, for example, execute a SQL query.
  • A gRPC connection allows you to execute functions (Remote Procedure Calls) on the source system and return their results. gRPC is mostly used to connect smaller independent services together and works well for distributed services.

State management 

Once a connection has been made to the source system, it is easy to think that the next step is to just extract the data we need. You can definitely do that, but you will soon find that without careful planning your pipeline will come to a screeching halt as you have either exhausted your resources, reached a request limit, or worse: broken the source system by overloading it with requests. If our source system is a single currency exchange rate file from the ECB we do not need to worry about managing state, but if our source system is the company’s on premise database with 80.000 tables that the entire organisation depends on for sales and operations you might want to manage your state so that you can better streamline the data extraction process without overloading the source system. Managing state means keeping track of what is and what is not. State is literally the status or data of operating our pipeline. Most of the functions you use or program yourself will be stateless, that is to say they do not require the presence of globally available data, but can compute an output based purely on an input. However a program itself must often be stateful. In the case of our on premise database, we do not want to start over from zero every time, but we want to be able to continue where we left off whenever there is an error or a timeout. That requirement will be even more important when we make our program distributed, for example when we have multiple machines query different tables of the database at the same time. These machines need information in some way to be able to operate independently and make sure they are not doing the same task at the same time. State management does not have to be complex, although it definitely can and has to be in some situations. Most of the time a simple text file or even a global variable in your program is enough, and if you need a little bit more a lightweight (open source) database like SQLite or DuckDB can usually do the trick. Implementing some kind of state management in your program allows you to really improve your data ingestion pipeline both in terms of performance as well as data quality. Some common examples of keeping state are:

  • Keeping track of the status of a data extraction. This allows you to understand which extractions can be started and which have already completed.
  • Keeping track of extractions that have timed out or have errors so that you can retry them or even send an alert.
  • Keeping track of batches of data that is being extracted so that you can remove or adjust the entire batch whenever you find an error in a single item in the batch
  • Keeping track of the start and end date and time so that you can understand how long a certain extraction is taking or how the extraction duration is increasing or decreasing over time.
  • Keeping track of what has been added to or removed from the source system so that you can keep track of changes in the source system over time. This could be for auditing purposes or for keeping a history of a source system over which you have no or little control.

Data Extraction

With all the complexity of triggering your pipeline, authenticating the connection and keeping state, you might almost forget the most important part: extracting data. Interestingly enough this is both the simplest and hardest part. It is simple because it is often nothing more than retrieving a set of rows or a data object, but it is complex because retrieving the correct data can require a deep understanding of the source system. If you remember that the purpose of a source system is often wildly different than that of the analytical function that a data ingestion pipeline is trying to fulfill you can start to see that using source data for analyses will always to some extent require an understanding of the source system. If that is too abstract, think of a situation where a marketing manager wants the results of a specific christmas advertising campaign: What did it cost? How many impressions and how many clicks did we get? It seems easy enough but now let’s imagine that the source system keeps track of cost on the account level, clicks and impressions are based on an individual ad-level, the campaign name has changed over time and this change has been stored in the campaign table. All of a sudden, for one simple question, you have to extract tables with account details, tables with budget details, spend details, campaign history, ad history, campaign results, ad results. This is why a simple business question can turn into a complex pipeline and that pipeline gets more complex when you don’t understand the intricacies of the source system. However, when you finally do understand the data you need to gather —and you have of course documented this properly for your successor— you will find that there are many different ways of extracting the data. We will skip over the more exotic types of data extraction, but more than 90% of all data ingestion pipelines extract data in batches, or even micro batches consisting of a minute or so of data. This is especially true when the analysis on that data usually doesn’t happen in under 15 minutes and for most organisations it doesn’t even happen on a daily basis. In essence, that means that most of your data extraction will be done by calling an API, reading a file or querying a database. The result of that will then usually be stored either in memory or as a file. For example a data object can easily be stored as a JSON file and an array of (rows of) data could be stored as a Parquet file. To that extent the extraction is easy, but of course everything becomes more complex when working with a distributed system. When multiple machines are extracting data at the same time, for example because a data object is too big for one machine, but writing to the same file or folder you want to prevent write clashes, but also not create bottlenecks in your distributed system by preventing simultaneous writes. Luckily with modern day machines and cloud computing there is little that a single machine can’t handle so if you have the choice keep your extraction simple, there’s plenty of complexity to go around for everyone already.

Transformations

Over the last years there has been a big discussion on the order of steps in a data ingestion pipeline. Should it be extract, transform, load (ETL) or extract, load, transform (ELT)? As you can see the ‘T’ step is the contentious part. So what is the right order? As any good consultant will tell you, the answer is: “It depends.” Another way of thinking about this is that it is not one or the other, it is both at the same time: ETLT. The questions you should ask yourself though is what type of transformation you want to do in which step. Historically the goal of an ETL process or data ingestion pipeline was not just to get the data into a data warehouse, but also to minimize the load on the data warehouse by doing some transformations in the process itself. This would make it easier to handle ‘big data’ in a normal data warehouse. However with the onset of columnar distributed databases and data lakes like BigQuery, Snowflake, Databricks and Redshift, the need for this type of pre-processing became less prevalent while the need to access raw data became more prevalent. As we discussed in the previous chapters, this correlates with the rise of the analytics engineer as some of the transformational parts of a data ingestion pipeline that were traditionally in the hands of data engineers shifted to the more technical analysts, that is analytics engineers. That is not to say there is no longer a need for transformations within the data ingestion pipeline though. There are a few common use cases where in-flight transformations make a lot of sense. However the days of big transformations and data modelling inside the ingestion pipeline for cost or storage reasons have indeed passed. We will go through some of the most common transformations for data ingestion pipelines, but in general you should be aware that the mantra is always: “It depends.” If some transformation makes sense for your specific use case or organisation you do not have to listen to an ELT-absolutionist.

  • Filtering: Even though you could store all data in your data lake, it does not mean you have to. Data still does add up and performance can decrease when reading through massive amounts of files. If you know there is data that you will never need, for example because it is from a different department or business unit or it is from a time period that doesn’t make sense to analyse, it makes a lot of sense to filter out this data.
  • Cleaning: You will likely not want to do too much cleaning, or at least not in a way where any business logic is applied, but in general it does make sense to clean up fields or data that can cause confusion or breaking later on. This could be as simple as making sure that a field named ‘date’ is not actually a timestamp, or that a string is not escaped multiple times. Basically it is anything to make sure that the data and data types match the expectations of the receiver.
  • Joining: Whenever a source system is heavily normalized, that is when everything is split up into many different tables for dimensions and facts, it might make sense to join these tables in your ingestion pipeline. This could be de-normalisation for performance reasons, or just for the convenience of having one table downstream that is a single source of truth.
  • Enriching: You can argue that all kinds of joins between different systems should be done as late as possible so that information is always up to date, but for some information you just know that it is never going to change. Some enrichments or lookups can save you a lot of time downstream while simultaneously increasing performance. Just think of enhancements like looking up a geolocation based on an IP address, adding in the weather for the day and location or splitting up a string based on a separator.
  • Reshape: Sometimes data is not in the shape that you’d want it to be. This is especially the case when you are trying to go from a data object to a set of rows. For example it usually doesn’t make sense to have a table with just one column that then contains a (JSON) data object. Instead you would usually want to flatten this objects keys into separate columns in your table. Other times you may just want to make sure that you get the data from a specific set of keys in the data object and populate the values in the corresponding columns.
  • Deduplication: Making sure there are no duplicates in your data either through quirks in the source system or because of processing steps in the ingestion pipeline can save a lot of headaches later on. Commonly this is done by using either a unique identifier for a row, file or object or by calculating a hash to determine if two items are in fact the same item. Sometimes this hash is also stored with the item to allow deduplication later on or to prevent processing something again that already exists.
  • PII removal: With increased focus on personal identifiable information (PII) it has become more important to consider whether or not you want to store this data at all. Some solutions allow you to remove this altogether, as usually you will not even need it for the downstream analysis. If you do need it, you will usually not need the PII itself, but the PII as a factor to distinguish people in your dataset. The solution can usually be as simple as hashing an email address or other unique identifier, so that values can still be distinguished as belonging to the same entity, but not in a way that leads back to a specific person. Of course depending on the type of data you are handling and the region in which you process the data this might change or even be dependent on the level of consent that an end user has given.

Validation and Data Quality

We have already discussed the importance of understanding the source system as well as performing cleaning and deduplication on incoming data. These are important steps to eventually give a certain guarantee on the quality of the provided data in the data warehouse, but even more important is to make sure the data conforms to a specific schema. A schema is nothing more than a definition of which field should contain which type of data (string, number, date, etc.). Simple however, does not mean it can’t get complex very quickly when you have nested fields of objects of arrays of objects. Nonetheless a schema can help you both in validating the data coming in as well as documenting the data going out so an analyst knows what to expect in the data warehouse. Data quality is more than just checking incoming values against a schema though. If you care about data quality for your pipeline one of the first steps you can add, before going all in on a data observability tool is checking for some common issues or values that you are not expecting. This is usually as simple as checking either the uniqueness of a row or object —which should be easy with the unique identifier or hash key that we generated in the previous step— or the existence of NULL-values where they are not expected. If you want to go one step further you can start with some simple anomaly detection or profiling of your data. For example, what was the average number of rows you imported over the last few days and what was the variance? Now you can ask yourself: How much deviation from this average am I willing to allow before I send out an alert?

Loading

With a clean and validated dataset we can finally proceed to load the data into our data warehouse. There are many ways to do this, but one important thing to note is that a modern day data warehouse doesn’t need everything to be stored in the actual warehouse. We mentioned before that often times data objects are already stored as files in an intermediate step. These files can be loaded into the data warehouse as most of the data warehouse solutions will allow you to easily import Parquet, JSON, Avro or CSV files for example, but you can also decide to go for what is often called a data lake. Essentially, a data lake is nothing more than a semi-structured collection of files in some sort of cloud storage like AWS S3, Google Cloud Storage, or Cloudflare R2. Most data warehouses are able to access a data lake, or rather collection of files, through something called an external table. An external table is a way of storing the actual data in its original location and only keeping a catalog with metadata in the actual data warehouse. If you think about it, it is not very different from an actual data warehouse. You could think of a table in your data warehouse as if it was based on files in folders. If you apply partitioning to a column, for example a column with dates, each date will be a separate folder. If you also apply clustering, for example on country, there could be another set of sub-folders for different countries. Depending on the size of your table, you could split the data in the folder into one or more files that can then be processed independently when you execute a query. For example, looking up the sum of a metric, let’s say sales, for a specific country in a specific date range means you can now just grab all the files from the relevant folders, calculate the sum per file independently and then add the results together to get the final sum of sales.

Archiving and retention

It is easy to think that when the data is loaded into the data warehouse your job is done. However, the truth is that you will need to take care of your data after it has been loaded. You will need to consider if you want your data to persist for an infinite amount of time or if you want to set the retention to a certain threshold. Especially when it comes to user or medical data there may be limits to how long data is allowed to be stored. On top of that it could be the case that a user will file a request for their data to be deleted. You will need a strategy to handle that deletion request as well. Finally, you will have to consider how ‘active’ you want your data to be. Some data is only relevant as a backup, and even though storage has become cheaper and cheaper, as the amount of data grows, it might still be worth it to move archives and backups to a cheaper, so called ‘cold’ storage solution.

3 Key topics for managing ingestion pipelines

Apart from going through the steps of setting up a data ingestion pipeline, there are also three important topics that are relevant to each step or the entire pipeline.

Scalability & resilience

As the load on your pipeline increases over time there is more and more pressure on your pipeline to keep up in terms of performance. Even though you might start with a sequential, single-thread program as is common for example when writing in Python, over time you might want to consider turning parts of your pipeline into loosely coupled functions that can scale independently. For example, the extraction might happen on a single machine that you might have to increase in size over time, while the transformations and loading scales dynamically with serverless functions depending on the load.

In any case you will have to implement some sort of error or exception handling to be able to handle common exceptions like timeouts, expired credentials for authentication, scheduled maintenance, or hitting rate limits. Your program should have some form of triaging for these types of exceptions to determine which ones it can handle and for which ones it should fail or alert. If it is able to handle the exception, you might want to retry the extraction process. However, you also don’t want your program to get stuck in an infinite loop of retries. Often this is mitigated by specifying a maximum amount of retries. To also make sure you do not waste all your retries immediately on a service that is just down for a few minutes there is often a process called exponential back-offs. This means that the time between retries scales exponentially, for example by going from 1 minute to 4 minutes to 16 minutes.

Monitoring, logging, alerting

Despite the fact that you have checked everything twice errors will still happen. Either because of a scenario that you didn’t think of or because of something that is fully outside of your control. Nonetheless you will need to first know and understand that an error actually occurred, need to be able to pinpoint exactly where it occurred and finally be able to debug and understand the source of the error. Setting an alert will help you get notified about any issues with your pipeline in the channel of your choice. However, there is a delicate balance between making sure you do not get alert fatigue by an overload of false positives and making sure you do not miss an alert when there is an actual issue. Monitoring allows you to understand the health of parts of your pipeline or multiple pipelines. Where alerting is pushing information, monitoring is pulling information. Sometimes, especially when you have multiple pipelines it can be worth monitoring those pipelines on different metrics like the number of rows ingested or the last ingestion time. Finally, logging will allow you to debug your pipeline when something goes wrong and identify the root cause.

Governance

A corner stone of any data pipeline is building trust. You can have as many alerts or monitors, but if you alert on the wrong issues or monitor irrelevant metrics the trust in your data will eventually erode. Governance starts with setting the right expectations among stakeholders to build and keep that trust. You can formalize that trust with a service level agreement (SLA) that determines among other things the required quality, interval, and up time of your data pipeline as well as what happens when these standards are not met. More recently data contracts have become popular as they serve a similar purpose but are more oriented towards data consumers instead of services.

While contracts and agreements are often internal to an organization, there are also external factors like privacy, security, and legislation that need to be taken into account when thinking about data governance. The requirements may vary for your type of data, organization, or country, but common sense goes a long way here. So, start with simple measures like limiting access and permissions to your data and pipeline infrastructure as well as making sure proper authorization is in place for every part of your pipeline so only specific roles or users can read, write, or delete data. This will help you prevent data breaches and compliance issues.