Building an ETL pipeline from device to cloud (part 3)

Published by Coenraad Pretorius on

In this part, we will create the Extract, Transform and Load (ETL) module to transform the JSON data into the required CSV format and add it to the web server as a background task.

Image adapted from pch.vector and Freepik

Series breakdown

In these series of blogs I will be covering building an ETL data pipeline using Python. The data is coming from field devices that are installed on mobile equipment. Each blog focusses on a specific part of the pipeline to move the data from the field device to the cloud.

Improving the code for the web server

In part two we scaled up our web server using FastAPI and saved the JSON data in a local file. Now we will refactor our code to move the saving of files and processing of the data as background tasks. Background tasks are activities that the device itself does not need to wait for to complete. In our case, the device just needs to deliver the data to the web server. The processing after that does not concern the device which makes it great to use for our solution. ETL processes may also run longer and don’t need to tie-up the device waiting for a response.

Create a module for background tasks

Let’s start by creating a custom module by creating a folder called utils and an empty __init__.py file. Within the folder create another new file called process_json.py with a method called process_data_task. This method firstly saves our JSON data to a local file on disk. This is the same code from part two our main web server, just moved to a background task.

def process_data_task(eqmt_ip: str, json_data: dict):
    """Process data tasks"""

    # get current time
    file_time = datetime.now()
    file_datetime = file_time.strftime("%Y-%m-%d_%H-%M-%S-%f")
    file_datetime_ts = file_time.strftime("%Y-%m-%d %H:%M:%S.%f")

    # generate json filename and path
    json_filename = f'data_{json_data["sn"]}_{file_datetime}.json'
    json_file_path = f"{base_dir}\\data\\raw\\{json_filename}"

    # save json file to disk
    with open(json_file_path, "w") as jsonfile:
        json.dump(json_data, jsonfile)
    log.info(f"--- Raw JSON data file saved: '{json_filename}'.")

    # transform json data to CSV
    log.info(f"--- Starting data transformation for '{json_filename}'...")
    transform_data(file_datetime, json_data, file_datetime_ts)
    log.info(f"--- Data transformation complete for '{json_filename}'.")

    # log background tasks completed
    log.info(f"--- Background task completed 'process_data_task'.")

Next, create another method to transform the data, called transform_data. Here is where we create our data frame and extract the required field from our JSON object. We create an empty data frame with three columns: Timestamp, TagName and TagValue. The JSON object contains an array of data objects.

We iterate through each data object, firstly converting the ts field into a proper timestamp. Then create a Python dictionary (rows) linking to the fields within the specific data object. Each row contains our three columns, and we construct our TagNames using a predefined convention. Once done with one data object, we append rows to the Pandas data frame and continue to the next one. At the end of the method, we save to whole data frame as a compressed CSV file.

def transform_data(eqmt_ip, file_datetime, json_body, file_datetime_ts):
    """Transform JSON data to columnar CSV"""

    # create the first row with the current timestamp and device serial number
    row = [
        {
            "Timestamp": file_datetime_ts,
            "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.DvcSrlNmbr",
            "Value": json_body["sn"],
        }
    ]
    df = pd.DataFrame.from_dict(row)

    # iterate through each data object for device type 'aa' and type 'bb'
    for ds in json_body["data"]:

        if ds["type"] == "aa":
            # convert unix time to date time field when measurement was taken
            timestamp = datetime.fromtimestamp(int(ds["ts"])).strftime("%Y-%m-%d %H:%M:%S.%f")

            # create a dictionary with Timestamp, Tag and Value from the JSON object
            rows = [
                {
                    "Timestamp": timestamp,
                    "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.OlTmp",
                    "Value": ds["temperature"].replace("-", ""),
                },
                {
                    "Timestamp": timestamp,
                    "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.OlVscsty",
                    "Value": ds["visco"],
                },
                {
                    "Timestamp": timestamp,
                    "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.OlDnsty",
                    "Value": ds["density"],
                },
            ]
            # append rows to data frame
            df = df.append(rows)

        if ds["type"] == "bb":
            # convert unix time to date time field when measurement was taken
            timestamp = datetime.fromtimestamp(int(ds["ts"])).strftime("%Y-%m-%d %H:%M:%S.%f")

            # create a dictionary with Timestamp, Tag and Value from the JSON object
            rows = [
                {
                    "Timestamp": timestamp,
                    "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.DvcSrlNmbr",
                    "Value": ds["sn"],
                },
                {
                    "Timestamp": timestamp,
                    "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.TmSncLstRst",
                    "Value": ds["uptime"],
                },
                {
                    "Timestamp": timestamp,
                    "Tag": f"{site}.OilMon.{eqmt_ip.replace('.', '_')}.EISIntrrgtnsSncLstRst",
                    "Value": ds["sweepCount"],
                },
            ]
            # append rows to data frame
            df = df.append(rows)

    # create filename for processed data
    transformed_filename = f'data_{site}_{eqmt_ip.replace(".", "_")}._{json_body["sn"]}_{file_datetime}.csv.gz'

    # save to compressed CSV file
    df.to_csv(
        f"{base_dir}/data/processed/{transformed_filename}",
        compression="gzip",
        index=False,
    )

Integrating with the web server

In our main server python file, let’s import BackgroundTasks from FastAPI and our new module by adding from utils import process_json. In our POST request, we add background_tasks: BackgroundTasks in the arguments. We remove the code to get the current time and saving the JSON file as this is now in our background task. After we get our JSON object, we now add our background task.

# data upload post
@app.post("/upload/data")
async def upload_data_from_device(data_request: Request, background_tasks: BackgroundTasks):
    """Upload data from field devices"""
    # get device ip address
    eqmt_ip = data_request.client[0]
    log.info(f"--- New POST received from EqmtIP '{eqmt_ip}'.")

    # get json body from request
    try:
        json_body = await data_request.json()
    except:
        log.error(f"*** Error processing JSON file for '{eqmt_ip}'. Sent 500.")
        return Response(content=None, status_code=500)

    # pass data to background tasks for saving and etl
    background_tasks.add_task(process_json.process_data_task, eqmt_ip, json_body)
    log.info(f"--- Background task added 'process_data_task'.")

    # important to include "status_code=200" to avoid duplicate uploads from devices
    return Response(content=None, status_code=200)

Conclusion

We refactored our code for our web server, simplifying it and adding a new module to run our tasks. Once we receive the JSON object, we send the response to the device and do the longer running tasks in the background. We transformed the JSON data into CSV format for easier processing.

Full code available on my GitHub repo.

Next up

In the next part, we will move our data files to the cloud using Azure Blob Storage.


0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *