Building a memory-efficient ETL to convert JSON to CSV in Python with prefect

Building a memory-efficient ETL to convert JSON to CSV in Python with prefect

Featured on Hashnode

Have you ever built a simple ETL (Extract-Transform-Load) pipeline in python to convert a JSON file to CSV? Easy peasy, right????? But have you ever tried to transform a large dataset (Large here meaning a dataset in which the dataset size is larger than your PC's memory) That should be interesting, isn’t it? In this tutorial, I will be showing you how to do this in python. Now let's get into it.

Tools we’ll be using:

JSON, CSV, prefect, logging, tqdm

Dataset:

The dataset we’ll be using is the Amazon clothing and fashion dataset which is about 5.1GB in size, you can download it here image.png

Why should this interest you?

Most data scientist when starting their career starts with ready-made CSV files that they load directly into pandas with pandas.read_csv(), but the reality is in practice, most companies don’t have their dataset readily available, which means most times, the data scientist needs to source their data, from databases, the internet and anywhere the data may be. This is where JSON comes in, JSON is the data format that rules the web, data sent and received on the web, between servers are usually in JSON format. This means it is important to know how to work with JSON files as a data scientist.

So what is ETL? ETL standing for Extract-Transform-Load is a data engineer term used to describe the process by which data is extracted (from the web or database), transformed to add or remove data points that may not be in the original data point and load it in a format the data scientist can easily work with it, usually in CSV format. So basically an ETL pipeline is a data pipeline by which data is extracted, transformed and loaded in a ready-to-use format. Pipeline means a sequence of functions tied together to perform a particular task. So let’s dive straight into it.

Challenge?

The two main challenges we have are: • The size of the dataset (which is about 5.1GB) • The RAM size of the pc we are using (which is 4GB) This poses a challenge meaning we can’t load the entire dataset to memory because we’ll run out of memory. This means we have to look for a smart way to work with the dataset without running out of memory or crashing our PC. The other problem is data schema is ‘not so consistent’, this means while the basic keys in each line are consistent, the “style” key contains another dictionary that is which are not the same for all rows in the dataset, i.e depending on the product, the “style” key contains descriptions of the product.

  {{'overall': 5.0, 'vote': '2', 'verified': True, 'reviewTime': '05 4, 2014', 'reviewerID':      'A2IC3NZN488KWK',   'asin': '0871167042', **'style': {'Format:': ' Paperback'},** 'reviewerName': 'Ruby    Tulip', 'reviewText': 'This book has beautiful photos, good and understandable directions, and many    different kinds of jewelry.  Wire working and metalsmithing jewelry are covered.  Highly recommend this book.', 'summary': 'Unique designs', 'unixReviewTime': 1399161600}
{'overall': 5.0, 'verified': True, 'reviewTime': '08 12, 2017', 'reviewerID': 'A3HWZ3ARDF51IL', 'asin': 'B00009ZM7Z', **'style': {'Size:': ' 10.5 D(M) US', 'Color:': ' Fudge'},** 'reviewerName': 'Amazon Customer', 'reviewText': "My husband loves his Merrells. He won't wear any other brand at this point.", 'summary': 'Best brand - so comfy', 'unixReviewTime': 1502496000}
{'reviewerID': 'A3VJX8VUVEKG3X', 'asin': 'B0001XVUFA', 'reviewerName': 'Parisa Diba', 'verified': True, 'reviewText': 'Awesome', 'overall': 5.0, 'reviewTime': '06 15, 2016', 'summary': 'Five Stars', 'unixReviewTime': 1465948800}}

So we need to find a way to extract all the columns in the dataset and represent them at the header of the dataset.

How to go about it

Let’s import the necessary libraries we’ll be using.

# Loading necessary libraries
import json
from tqdm import tqdm
import csv
from prefect import task, Flow
import logging

A breakdown of what we’ll be using each of the libraries for • json: to read the JSON file • tqdm: to view updates or progress of our function running • csv: to write in CSV format to the file • prefect: to set up our ETL Next, let us set us the log configuration for the project

logging.basicConfig(
    format="%(asctime)s [%(levelname)s] \
                            %(funcName)s: %(message)s",
    level=logging.DEBUG,
  )
logger = logging.getLogger()

this log configuration sets the minimum logging level to (I’ll explain logging in another article) and every time we log a message to the terminal, it sets it to automatically log the time (asctime) the log is created, the level of the log (levelname), the name of the function (funcName) and the message of the log. Next, we’ll set the destination where the file is located

# Location to the json file
LOCATION = "../../data/raw/Clothing_Shoes_and_Jewelry_5.json"

We’ll define functions to get read each line of the dataset, get all description keys

def read_json(line):
    """This function reads a line of the json file as a dictionary"""
    return json.loads(line)

def get_desc_keys(line):
    """This function gets the description keys in each line"""
    KEY = "style"
    desc_dict = line.get(KEY)
    if desc_dict is not None:
        return list(desc_dict.keys())
    else:
        return None

The first function read_json takes in a line of data file after it has been opened with python open() and returns a dictionary containing the content. Usually, after opening the file, you load the entire file with json.loads to get the entire data as a list of dictionaries but because we risk running out of memory that way, thus we are iterating over each line of the data file, therefore by default, python is reading the line as a string before we covert to a dictionary using json.loads but the line only and not the entire dataset. The second function get_desc_keys take in the line of the file (now a dictionary) and looks for the style inside it (because some products don’t have it at all). If the ‘style’ key is found, it retrieves all the keys in the dictionary (since the value of the ‘style’ key is a dictionary, for which each key is a descriptor of the file). If the style key is absent, it returns None (which means nothing). Next, before we continue, we want to obtain all the description keys in the entire dataset.

def get_all_desc_keys(LOCATION):
    """
    This function combines all the descriptions of each data entry to obtain
    all unique descriptions in the 'style' key

    THIS FUNCTION IS TO RUN JUST ONCE
    """
    file = open(LOCATION, "rb")
    desc = list()
    for line in tqdm(file, desc="Getting all the descriptions"):
        json_dict = read_json(line)
        line_desc = get_desc_keys(json_dict)
        if line_desc is not None:
            desc.extend(line_desc)
        else:
            pass
    logger.info(f"{'-'*20}all product desc keys obtained successfuly{'-'*20}")
    return list(set(desc))

First, we open the file to read, and then we create a desc list to host all of the description keys we find. Next, we iterate through each line of the file, tqdm gives us a progress bar to see how far the progress is, with our function, we then use the read_json that we created earlier to convert the line to dictionary while the get_desc_keys to get the description keys in each line and we add the list returned to the existing desc list that contains all the desc keys. After the entire file has been read, we then log a message that all the product desc keys have been retrieved, and then we return a list containing all unique values in the list. Next, we’ll create a list containing all the original keys in every line and the descriptors in the ‘style’ key and also remove the ‘style’ key since we can directly get values in them.

def get_columns(json_line, descriptions):
    """
    This function gets the keys (to be used as columns) and the description
    keys and combines all of them together as the columns to be used
    """
    column_keys = list(json_line.keys())
    column_keys.extend(descriptions)
    column_keys.remove("style")
    logger.info("Columns in the file obtaained successfuly")
    return column_keys

The function obtains the keys (columns to be) in the line (dictionary produced by read_json and combines the list returned get_all_desc_keys (descriptions) and logs that the task has been completed. All we have been doing previously is get a compilation of what we’ll define some functions to get what columns will be in our new CSV and the keys we’ll use to get the values, next, we’ll get the actual values. But before we get the actual values, we want to get only the lines with a ‘style’ key, so we’ll create a function to check that and return a boolean (True or False), if False, it should log that the product desc is absent.

def PRODUCT_DESC_PRESENT(line_dict):
    """
    This function evaluates if product description (key 'style')
    exists and return bool yes or no
    """
    STYLE = "style"
    desc = line_dict.get(STYLE)
    if desc is None:
        return False
    else:
        # logger.info("product desc keys absent thus passing....")
        return True

Next, we get the actual values.

def strip_values(value):
    """This function strips the strings or return the original value"""
    if type(value) is str:
        return value.strip()
    else:
        return value

def get_values(prod_dict, columns, all_prod_desc):
    """
    This function gets the values from the keys of the json file present
    """
    new_prod_dict = dict()
    STYLE = "style"
    for key in columns:
        if key not in all_prod_desc:
            new_prod_dict[key] = strip_values(prod_dict.get(key))
        elif key in all_prod_desc:
            value = prod_dict[STYLE].get(key)
            if value is None:
                new_prod_dict[key] = "null"
            else:
                new_prod_dict[key] = strip_values(value)
    return new_prod_dict

The strip_values check is the value passed into it is a string and removes any trailing or preceding blank space, so we’ll have a compact value The get_values function receives a line of the document (returned by read_json), all the columns (returned by get_columns) and the all product description keys (returned by all_prod_desc_keys). It then iterates through all values in the columns to check if the value is in the all_product_desc, if it is not, it retrieves it directly, if it is in the all_prod_desc, it retrieves the value the from the ‘style’ key. If the style key doesn’t have the column, then it returns None and we assign ‘null’ to the value [You can assign this anything]. It then returns back a full dictionary with all the columns assigned a value. Now, we are ready to start creating our new CSV files, we’ll create a couple of functions to help with this

def get_file_destination(LOCATION):
    """
    This function sets the destination to save the files to as 'data/processed'
    """
    destination_list = LOCATION.split("/")
    return "/".join(destination_list[:-2]) + str("/processed/")

def create_file(file_destination, n_files):
    """This function creates a new file where the csv file will be saved"""
    filename = file_destination + "FILE_" + str(n_files) + ".csv"
    file = open(filename, "w", newline="")
    n_files += 1
    logger.info(f"file {filename} created successfuly")
    return file, n_files

The get_file_destination gets the new directory where we want to save our new files. The existing dataset is in the ‘/data/raw’ folder, whereas we want to save the file in the ‘/data/processed/’ folder. This part is absolutely optional, use only if you want to create the files in a different folder. The create_file creates a new file in the file_destination passed to it, it adds a number to the file name (therefore if n_files passed to it is 0, it creates FILE_0.csv at the file destination and returns the file (without closing yet) and the n_files+1. Next, we want to create optional checks in our file if we have written 500,000 lines to the new file. We want to save our files when they have 500,000 lines already and create another line [This part is totally optional]

def NEW_FILE_BREAK(n_rows):
    """
    This function evaluates if we have 500,000 entries in the existing file
    """
    expected_break = 500000
    if n_rows % expected_break == 0:
        logger.info(f"{'-'*20}500,000 lines written successfuly{'-'*20}")
        return True
    else:
        return False

After this, we then use csv.Dictwriter to write a new line to our file, the CSV writer will be instantiated in the next function, but this function receives the CVS writer and the values we’ll like to pass

def write_line(csv_writer, values):
    """
    This function writes a new line into the CSV file containing all our data
    """
    return csv_writer.writerow(values)

In this final function, we’ll combine all the functions we have written and implement some minor functions we haven’t defined before to carry out our task

def main():
    n_rows = 1
    n_files. = 1
    columns = list()
    all_prod_desc = get_all_desc_keys(LOCATION)
    with open(LOCATION, "rb") as file:
         file_destination = get_file_destination(LOCATION)
         processed_file, n_files = create_file(file_destination, n_files)
         csv_writer = None
         for line in tqdm(file):
            line_dict = read_json.run(line)
            if n_rows == 1:
                columns = get_columns(line_dict, all_prod_desc)
                csv_writer = csv.DictWriter(processed_file, fieldnames=columns)
                csv_writer.writeheader()
            else:
                pass
            if PRODUCT_DESC_PRESENT(line_dict):
               n_rows += 1
               values = get_values(line_dict, columns, all_prod_desc)
               if NEW_FILE_BREAK(n_rows):
                    processed_file.close()
                     processed_file, n_files = create_file(file_destination, n_files)
                     csv_writer = csv.DictWriter(processed_file, fieldnames=columns)
                     csv_writer.writeheader()
                else:
                    pass
                write_line.run(csv_writer, values)
            else:
                pass
        processed_file.close()
        logger.info(f"{'-'*20}Task complete, {n_rows} written....{'-'*20}")
    return main

We start the function by assigning some variables, n_rows, n_file and columns. We then run the get_all_desc_keys function and assigned it to all_desc_keys. We then opened the original dataset with context manager and create our first empty CSV file. We instantiated the csv_writer initially to None and began to loop through our file and read the line to a dictionary (with read_json). For the first line, we obtained all the columns (with all_desc_keys already retrieved). We then created the csv.DictWriter, csv.DictWriter retrieves values from our dictionary (to be created by the get_values) and writes them as a CSV line. [This is because the usual CSV writer (csv.writer) expects a list and writes it in CSV format into the file, csv.DictWriter enables us to return a dictionary instead and write it in CSV format]. Next, we write the headers (list of all the columns), then we check if the line has a ‘style’ key, if present, we retrieve the values (in a dictionary). After this, we’ll check if we have written 500,000 lines into the currently open file, we track this by the n_rows, if yes, we create close the existing file, create another one and instantiate the csv.DictWriter again and write headers, if we haven’t written 500,000 lines into the open file yet, then we write the values of that line into the open file and add one to n_rows. Once we are done with all these, we close the last open file and voila, that is it.

if __name__ == ‘__main__’:
    main()

Where does prefect come in?

I mean, we have written the pipeline without actually using prefect, so where do we use it? Prefect is actually a python workflow manager, it is a data engineering tool that ensures your code runs well and when it fails, it guarantees that it fails successfully (this means you know exactly where it failed). It is based on Directed Acrylic Graph (DAG) which in simple terms are python functions tied and directed towards another to form a pipeline (in a similar way as we’ve done). It also gives you the ability to schedule when you want your pipelines to run, what to do when it fails and how many times to retry. Prefect has 4 major units, namely task, flow, scheduler and parameters but we’ll be using majorly two, task and flow. Task enables us to convert our function to a single step in our workflow. To use task, we just need to add @task as a decorator to our functions. decorators are python functions which take in another function, e.g our function in this case, so as to carry out the function we have described and some other things in addition to our function. For all functions encapsulated in prefect tasks, we need to use them either with prefect Flow context manager (the second prefect tool we’ll be using) or use .run() method with our functions when we want to call them (to be on the safe side, we’ll use both). The other prefect tools are parameters, which enable us to pass in different parameters we want to try with our pipeline and prefect scheduler, we enable us to schedule our pipeline to run at any time we specify, since we just want to run our pipeline once, we won’t be using this. So I’ll reinplement all of our function with prefect flow

# Loading necessary libraries
import json
from tqdm import tqdm
import csv
from prefect import task, Flow
import logging

logging.basicConfig(
    format="%(asctime)s [%(levelname)s] \
                            %(funcName)s: %(message)s",
    level=logging.DEBUG,
)
logger = logging.getLogger()

# Location to the json file
LOCATION = "../../data/raw/Clothing_Shoes_and_Jewelry_5.json" 

@task
def read_json(line):
    """This function reads a line of the json file as a dictionary"""
    return json.loads(line)

def get_desc_keys(line):
    """This function gets the description keys in each line"""
    KEY = "style"
    desc_dict = line.get(KEY)
    if desc_dict is not None:
        return list(desc_dict.keys())
    else:
        return None

@task
def get_all_desc_keys(LOCATION):
    """
    This function combines all the descriptions of each data entry to obtain
    all unique descriptions in the 'style' key

    THIS FUNCTION IS TO RUN JUST ONCE
    """
    file = open(LOCATION, "rb")
    desc = list()
    for line in tqdm(file, desc="Getting all the descriptions"):
        json_dict = read_json.run(line)
        line_desc = get_desc_keys(json_dict)
        if line_desc is not None:

 desc.extend(line_desc)
        else:
            pass
logger.info(f"{'-'*20}all product desc keys obtained successfully{'-'*20}")
    return list(set(desc))

def get_columns(json_line, descriptions):
    """
    This function gets the keys (to be used as columns) and the description
    keys and combines all of them together as the columns to be used
    """
    column_keys = list(json_line.keys())
    column_keys.extend(descriptions)
    column_keys.remove("style")
    logger.info("Columns in the file obtained successfuly")
    return column_keys

def PRODUCT_DESC_PRESENT(line_dict):
    """
    This function evaluates if product description (key 'style')
    exists and return bool yes or no
    """
    STYLE = "style"
    desc = line_dict.get(STYLE)
    if desc is None:
        return False
    else:
        # logger.info("product desc keys absent thus passing....")
        return True

def strip_values(value):
    """This function strips the strings or return the original value"""
    if type(value) is str:
        return value.strip()
    else:
        return value

@task
def get_values(prod_dict, columns, all_prod_desc):
    """
    This function gets the values from the keys of the json file present
    """
    new_prod_dict = dict()
    STYLE = "style"
    for key in columns:
        if key not in all_prod_desc:
            new_prod_dict[key] = strip_values(prod_dict.get(key))
        elif key in all_prod_desc:
            value = prod_dict[STYLE].get(key)
            if value is None:
                new_prod_dict[key] = "null"
            else:
                new_prod_dict[key] = strip_values(value)
    return new_prod_dict


def get_file_destination(LOCATION):
    """
    This function sets the destination to save the files to as 'data/processed'
    """
    destination_list = LOCATION.split("/")
    return "/".join(destination_list[:-2]) + str("/processed/")

@task(nout=2)
def create_file(file_destination, n_files):
    """This function creates a new file where the csv file will be saved"""
    filename = file_destination + "FILE_" + str(n_files) + ".csv"
    file = open(filename, "w", newline="")
    n_files += 1
    logger.info(f"file {filename} created successfuly")
    return file, n_files

def NEW_FILE_BREAK(n_rows):
    """
    This function evaluates if we have 500,000 entries in the existing file
    """
    expected_break = 500000
    if n_rows % expected_break == 0:
        logger.info(f"{'-'*20}500,000 lines written successfuly{'-'*20}")
        return True
    else:
        return False

@task
def write_line(csv_writer, values):
    """
    This function writes a new line into the csv file containing all our data
    """
    return csv_writer.writerow(values)

def main():
    with Flow(name="json_to_csv_etl") as flow:
        n_rows = 1
        n_files = 1
        columns = list()
        all_prod_desc = get_all_desc_keys.run(LOCATION)
        with open(LOCATION, "rb") as file:
            file_destination = get_file_destination(LOCATION)
            processed_file, n_files = create_file.run(file_destination, n_files)
            csv_writer = None
            for line in tqdm(file):
                line_dict = read_json.run(line)
                if n_rows == 1:
                    columns = get_columns(line_dict, all_prod_desc)
                    csv_writer = csv.DictWriter(processed_file, fieldnames=columns)
                    csv_writer.writeheader()
                else:
                    pass
                if PRODUCT_DESC_PRESENT(line_dict):
                    n_rows += 1
                    values = get_values.run(line_dict, columns, all_prod_desc)
                    if NEW_FILE_BREAK(n_rows):
                        processed_file.close()
                        processed_file, n_files = create_file.run(
                            file_destination, n_files
                        )
                        csv_writer = csv.DictWriter(processed_file, fieldnames=columns)
                        csv_writer.writeheader()
                    else:
                        pass
                    write_line.run(csv_writer, values)
                else:
                    pass
            processed_file.close()
            logger.info(f"{'-'*20}Task complete, {n_rows} written....{'-'*20}")
    return flow

  if __name__ == ‘__main__’:
      Flow = main()
      Flow.run()

In this new implementation, I added prefect task in bold. One last important thing to note, for tasks that return more than one value, the number of outputs is expected to be specified using arguments ‘nout’ or converted to a tuple. This is why the create_file task has nout set to 2 because it returns two values. And this is the end of this tutorial, if you enjoyed it, please drop positive feedback, follow and share. You can also follow me on twitter @madeofajala thank you and see you later✌🏾.