Building an ETL pipeline from device to cloud (part 4)
In this part, we will create an uploader to load data to Azure Blob storage and handle some leaky pipes.
Series breakdown
In these series of blogs I will be covering building an ETL data pipeline using Python. The data is coming from field devices that are installed on mobile equipment. Each blog focusses on a specific part of the pipeline to move the data from the field device to the cloud.
- Part 1. Building a simple, secure web server
- Part 2. Scaling up the web server with FastAPI
- Part 3. Creating the ELT module as a background worker
- Part 4. Moving data to the Azure Blob storage (you are here)
- Part 5. Creating a function app to send data to the vendor
- Part 6. Ingesting data into Azure Data Explorer
- BONUS. Improving our data pipeline
Adding our uploader method
Building off the work we did in part three by adding a background worker, we will expand this to add a new uploading method. We are going to use Azure Blob Storage to store our raw JSON data and transformed CSV data.
For this we need to create a blob storage resource and copy access keys. Note that by default the storage account is open to the public internet and these keys gives full access to the storage account. For improved security in production, we should use shared access signatures (SAS) or a Service Principal. We copy the connection string and add it to our .env
file with the key AZURE_STORAGE_CONNECTION_STRING
. Remember to ensure to .env
is in your git ignore file.
We also need to install the Python package: pip install azure-blob-storage
and import the libraries. We will be using BlobServiceClient
and ContentSettings
.
# import Azure Storage libraries
from azure.storage.blob import BlobServiceClient, ContentSettings
Now we create a new upload_data_to_azure
method in our process_json.py
file. We will pass the generated files names and data to this method. We declare two flags that we use to ensure our files uploaded successfully. As we are uploading data to the cloud, we can have some leaky data in our pipeline. This can be due to internet connectivity, slow networks, issues with firewalls, etc. For this, we will build in a mechanism to handle uploaded errors.
Back to our method, we load our connection string and create a BlobServiceClient
. After that, we define the container and paths to use for our raw JSON data and transformed CSV data. First, we will try and upload our JSON data. There may be additional setting and flags we need to set to ensure our data is uploaded in the correct format. Azure Storage Explorer is great to use to verify that data was uploaded correctly and that we can download and open the files. After this upload we set has_json_uploaded
to True
.
We create a similar upload try/except block to upload the transformed CSV file. Note the settings here are slightly different for the data type. Again, we set the has_csvgz_uploaded
to True
and return both flags to use in our process_data_task
method.
Updating our transform method
We need to slightly modify our transform_data
method to return the transformed file name and data frame. We just add a return statement: return transformed_filename, df
.
Updating our main process method
The first change we make is to ensure we store the transformed_filename
and transformed_data
from our modified transform method above. We add code to call our upload_data_to_azure
method and store our two file uploaded flags. Note that I had an issue trying to convert the data frame and uploading it to Blob storage. For now, I just read the raw file from disk.
If the files have uploaded successfully, we are done with our process method. If the files have not uploaded, we added additional checks that will create copies of these files on the local disk. In other words, we store these in a failed_upload
directory, splitting it between raw and processed. Therefore, rather than checking the log files, we can quickly check these directories.
The one way we ensure data is uploaded, if it has failed, is to use Azure Storage Explorer to manually upload the failed files, assuming this occurs rarely. If there are regular issues or we need to make the solution more robust, we can automate this. This would be a good feature to add, and I will create another blog post for this solution and managing files on the local disk.
Conclusion
We added a new uploader method to store data in Azure Blob storage in both raw and processed formats. We also delt with leaky data pipelines by creating a separate local data storage for failed uploads.
Full code available on my GitHub repo. For this post, the repo is tagged with part4
.
Next up
In the next post we will create a function that reads data from blob storage and sends the data to the vendor.
0 Comments