Building an ETL pipeline from device to cloud (bonus part) 

Published by Coenraad Pretorius on

In this part bonus, we will be adding more robustness to our solution and discuss other improvements.

Image adapted from pch.vector, Freepik and Storyset.

Series breakdown

In these series of blogs I will be covering building an ETL data pipeline using Python. The data is coming from field devices that are installed on mobile equipment. Each blog focusses on a specific part of the pipeline to move the data from the field device to the cloud.

Recap of our data pipeline

We have made a lot of progress extracting data from field devices, processing the data, moving it to the cloud and making it available to our end-users. Below is an architecture diagram outlining all the components and interactions of our pipeline. I have updated the code and moved it all to a GitHub repo for easy reference.

Simplified architecture overview (excludes networking and private endpoints).

Improving our web server

In parts two, three and four we created and expanded on our FastAPI web server and while this works reliably there are some key improvements we can make. If you can recall, we stored and uploaded both the raw JSON data and the compressed CSV data.

The first improvement is to also compress the raw JSON data. While the files are small, this adds up over time and remember we are paying cloud storage costs. Using gzip we can easily compress the file and modify our filename. Note that this has an impact downstream which we will get to.

After running this for several months and doing some ad hoc investigations, our next improvement became apparent – partitioning! Our data was saved in two different folders, one for raw and one for processed data, but trying to find the data for device 85xx for April 22nd at 14:20 is challenging as everything is in one folder. The solution is making sure we implement partitioning, both locally and in cloud storage to make it faster to find things – the same reason computers like it as well. Using the serial number of the unit and uploaded date and time, we easily add efficient partitioning.

View this gist on GitHub

We need to be careful, as our Azure Function App is reading the JSON file, and thus we need to modify this function as well to be able to load the compressed JSON file. First, we modify the function.json path to listen for .json.gz to ensure the function triggers. Lastly, we change the main method of the app to decompress and decode the file before we load it.

View this gist on GitHub

These were simple changes in our web server, but we need to make sure we understand the impact downstream and modify other components accordingly. It is good practice to partition your data from the start, even with “temporary” solutions as they tend to stick around for much longer that they should.

Finally, we need to ensure our web server runs reliably on our Windows based server. After some searching, I came across the non-sucking server manager (or NSSM) and found this blog helpful to get it configured. This is a small application that allows us to run our web server as a Windows Service with all the benefits that it provides. We already have some other tools that monitor Windows Services, so this was added to the existing monitoring solution.

Handling failed uploads to the cloud

In our current solution, we save our failed uploads to a different folder to make it easy to know which files have failed. Up to now, there has not been any failures as the internet connection from the server is stable. There were some internal network issues at one point, but this meant that the devices could not communicate with our server as well, thus data was buffered on the device.

One option to make the failed uploads more robust is to create another script that monitors the failed uploads folder. This should periodically try and upload the files again and remove them if successful. This will need some custom development but check if you already have something similar in your data engineering toolbelt you can use.

Another option is using an IoT Edge device with the blob storage module. This does the same job as described above and will fit into our technology stack in future. I will be trying this out in a future blog with the OPC UA streaming module as well. Again, it is knowing what is available, what other projects are using and what can be easily maintained within the existing architecture.

Implementing data quality checks

After careful consideration, we decided to use Azure Data Explorer (ADX) as our big data platform. In part six, we created our data model to use for our data lake. The key aspect of ADX is that it is an append-only data store that it is optimised for ingestion. With this, we can have duplicate data if we upload the same file again with our function app for example.

There are various solutions that Microsoft recommends in their documentation to deal with duplicate data. As we are not expecting many duplicate records, we handle this within our continuous export query for our factOilData table. We add a summarize query with arg_max() by both TimeStamp and TagName and project-away the additionally created TimeStamp1 column.

Another data quality check is to ensure that we are providing valid data to our end-users. For this dataset (based on the device sensors), no values are expected to be less than zero or higher than 10 000. While the upper limit of 10 000 will differ per tag, these are common values we can include for now. Again, our continuous export query can handle this for us using the between function. As we are parsing the TagName to create additional key columns, we add a simple check to ensure that it is not empty.

The benefit of not filtering our data on the web server level, is that we can build a data quality dashboard from our raw ADX data. Also, if limits are to change in future, we can simply modify the query without potentially loosing historic data.

View this gist on GitHub

Improvements to our Data Model and testing

This post deals with best practices and would not be complete with improvements in our data model as well. The first is adding a data dictionary that gives descriptions of each column, lists the data types and constraints, and gives example data. This is especially useful in this case as we derive the keys for the dimension tables from the parsed TagName.

Data dictionary for our Data Model with example data (click to zoom).

One important aspect that is often overlooked is the ability to test queries and data model joins. For our project we test these in ADX as well. First, we create testing data for the raw_oil_data and include some specific bad quality data such as duplicates, values outside our range and empty columns. Next, we also create test data for our dimension tables to ensure we can join on the keys.

The results show that our five bad quality rows were removed (returned 97 good rows) and that we successfully joined six additional columns from our two dimension tables. As an aside, Microsoft offers a free ADX cluster for testing as well as their own sample data.

Ingest our test data in ADX and execute our data model queries (click to zoom).
View this gist on GitHub

Version control and documentation

It is always good to determine what is best practice and implement it within the constraints of your application. All the above code should be version controlled using Git or SVN. If these are not viable options, at least have some form of automatic backup in place, even if it is another python script that runs periodically which you can monitor.

Documentation is important, not just for someone else assisting or taking over the project, but just to remember what and why you did certain things. Of course, your code should be well commented but this doesn’t replace documentation with proper diagrams, pictures, and descriptions.

Conclusion

At the time of writing, we have about 60 (out of the total 80) devices connected continuously. Each device generates 53 data points every minute, so in terms of scalability we are comfortability handling about 4.5 million data points per day. The latency from the web server to the cloud is about six minutes, due to the batching policy on the table. There are currently 49 million records in the dataset.

Over these six plus one blog series we covered many different technologies and integrations as we built our data pipeline. While there are standard patterns we can often use, sometimes we need to be able to flexible and use what is available, but also that these can be maintained effectively with minimal effort.

During this series I have certainly learnt many new things and while reading and doing online courses are great, nothing beats working with a real-life project. I hope you enjoyed the journey with me and could get some value out of this series.

Recommendation

I came across a terrific book, Introduction to Data Engineering (affiliate link), by Daniel Beach as I was working on this project. It is an easy read on the concepts you need as a Data Engineer and was one of the inspirations for this bonus blog and all the improvements.


0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *