Building an ETL pipeline from device to cloud (part 4)

Published by Coenraad Pretorius on

In this part, we will create an uploader to load data to Azure Blob storage and handle some leaky pipes.

Image adapted from pch.vector and Freepik

Series breakdown

In these series of blogs I will be covering building an ETL data pipeline using Python. The data is coming from field devices that are installed on mobile equipment. Each blog focusses on a specific part of the pipeline to move the data from the field device to the cloud.

Adding our uploader method

Building off the work we did in part three by adding a background worker, we will expand this to add a new uploading method. We are going to use Azure Blob Storage to store our raw JSON data and transformed CSV data.

For this we need to create a blob storage resource and copy access keys. Note that by default the storage account is open to the public internet and these keys gives full access to the storage account. For improved security in production, we should use shared access signatures (SAS) or a Service Principal. We copy the connection string and add it to our .env file with the key AZURE_STORAGE_CONNECTION_STRING. Remember to ensure to .env is in your git ignore file.

We also need to install the Python package: pip install azure-blob-storage and import the libraries. We will be using BlobServiceClient and ContentSettings.

# import Azure Storage libraries
from azure.storage.blob import BlobServiceClient, ContentSettings

Now we create a new upload_data_to_azure method in our process_json.py file. We will pass the generated files names and data to this method. We declare two flags that we use to ensure our files uploaded successfully. As we are uploading data to the cloud, we can have some leaky data in our pipeline. This can be due to internet connectivity, slow networks, issues with firewalls, etc. For this, we will build in a mechanism to handle uploaded errors.

Back to our method, we load our connection string and create a BlobServiceClient. After that, we define the container and paths to use for our raw JSON data and transformed CSV data. First, we will try and upload our JSON data. There may be additional setting and flags we need to set to ensure our data is uploaded in the correct format. Azure Storage Explorer is great to use to verify that data was uploaded correctly and that we can download and open the files. After this upload we set has_json_uploaded to True.

We create a similar upload try/except block to upload the transformed CSV file. Note the settings here are slightly different for the data type. Again, we set the has_csvgz_uploaded to True and return both flags to use in our process_data_task method.

View this gist on GitHub

Updating our transform method

We need to slightly modify our transform_data method to return the transformed file name and data frame. We just add a return statement: return transformed_filename, df.

Updating our main process method

The first change we make is to ensure we store the transformed_filename and transformed_data from our modified transform method above. We add code to call our upload_data_to_azure method and store our two file uploaded flags. Note that I had an issue trying to convert the data frame and uploading it to Blob storage. For now, I just read the raw file from disk.

If the files have uploaded successfully, we are done with our process method. If the files have not uploaded, we added additional checks that will create copies of these files on the local disk. In other words, we store these in a failed_upload directory, splitting it between raw and processed. Therefore, rather than checking the log files, we can quickly check these directories.

View this gist on GitHub

The one way we ensure data is uploaded, if it has failed, is to use Azure Storage Explorer to manually upload the failed files, assuming this occurs rarely. If there are regular issues or we need to make the solution more robust, we can automate this. This would be a good feature to add, and I will create another blog post for this solution and managing files on the local disk.

Conclusion

We added a new uploader method to store data in Azure Blob storage in both raw and processed formats. We also delt with leaky data pipelines by creating a separate local data storage for failed uploads.

Full code available on my GitHub repo. For this post, the repo is tagged with part4.

Next up

In the next post we will create a function that reads data from blob storage and sends the data to the vendor.


0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *