What are Issues?

In the previous post, I wrote about issues, what they represented and their potential impact on modeling. To recap, a timeseries signal with time \(t\) for location \(s\) has typically has values \(x_{s,t}\). However when we account for phenomenon like back-fill, each of these values can have multiple issues / versions, which would then be more like \(x_{s,t,u}, u \ge t\) where \(u\) is another dimension of time that describes the modification date. By keeping track of issues, we can have nice features like as-of views of the data.

So tracking issues is great, but what if we did not do that from the start? We could start tracking issues for newly-ingested data, but existing data would require some sort of migration. This post aims to document some of the (mainly memory) hurdles I came across when trying to develop a tool to assist in such a migration.

Context: What Do We Have?

It was fortunate that at Delphi, daily SQL database backups were stored which allowed us to approximately reconstruct issues for existing data. The main idea is:

  1. For some timeseries \(x_{s,t}\), we have a backups of it at times \(u-1\) and \(u\).
  2. Find the differences from backup \(u-1\) to \(u\).
  3. If \(x_{s,t}\) is identical between both backups, then there probably was no new issue of it.
  4. If \(x_{s,t}\) differs between both backups, then there probably was an issue \(x_{s,t,u}\) with value from the latter backup.

We can keep repeating this process between adjacent days of backups to produce our issues! The problem was that each of these backups were large, compressed text files (in SQL INSERT INTO format) that got bigger the more recent they got.

There were several months of backup files to churn through, and no readily available clusters to run Hadoop or Spark on (at that time). The migration tool eventually managed to process all the backup files within a day on a single remote server with a few cores and 64 GB of RAM, but not without some extensive tuning! Here are some of the things I learnt while working on this tool.

Stick to Iterators

One of the steps in the tool’s pipeline was to partition large CSV files into smaller subsets based on a particular categorical column, signal source. Such partitioning allows us to process the data at a more granular level, controlling how many subsets we want to process in parallel or which subsets to omit totally.

A simplified version of this task involves just filtering down a CSV by a value for a specific column. A common list-based approach might look like this:

def filter_csv_list(
    csv_in: str, csv_out: str,
    col_idx: int, col_val: Any
):
    with open(csv_in, "r") as f_in:
        with open(csv_out, "w") as f_out: 

            lines = f_in.readlines()
            for line in lines:

                value = line.split(",")[col_idx]

                if value == col_val:
                    f_out.write(line)

If we profile the memory usage on a ~200 MB CSV file, we get:

%memit filter_csv_list("data/cc-est2018-alldata.csv", "data/cc_list.csv", 1, "Pennsylvania")
>>> peak memory: 254.37 MiB, increment: 205.34 MiB

What we are focusing on is the increment, or how much memory this particular line used. It seems like filtering this ~200 MB CSV file required ~200 MB of memory, but can we do better?

def filter_csv_iter(
    csv_in: str, csv_out: str,
    col_idx: int, col_val: Any
):
    with open(csv_in, "r") as f_in:
        with open(csv_out, "w") as f_out: 

            for line in f_in:

                value = line.split(",")[col_idx]

                if value == col_val:
                    f_out.write(line)

The change is minor. We omit using readlines(), a commonly used function that returns a list of lines, and directly use the file iterator f_in instead, which returns an iterator of lines. Profiling this similarly, we get:

%memit filter_csv_iter("data/cc-est2018-alldata.csv", "data/cc_list.csv", 1, "Pennsylvania")
>>> peak memory: 49.89 MiB, increment: 0.00 MiB

Now increment is ~O MB, a great difference! This sounds more like it, as we really do not need the whole file in memory to filter it down. We can immediately decide for each row whether to keep it or not.

The change is also really subtle, as Python makes using either iterators or lists really seamless, which could be a good or bad thing. In the spirit of keeping memory utilization down, this meant really carefully sticking to using iterators as much as possible and really only loading data as it is needed. The same ideas were used for CSV partitioning here in the tool.

Save memory by preferring generators, using the built-in itertools, and verifying you are using iterator-based functions!

CSV Differencing

csvdiff

The core of this tool performs differencing between CSV files (reformatted SQL backup files). Being the step with the bulk of the processing and memory utilization, optimizing this is of particular importance.

For prototyping, I initially looked at existing CSV differencing tools to not re-invent the wheel if possible. csvdiff was great, and I ended up using it quite frequently in other areas when dealing with CSVs. It finds the indexed differences quickly and returns JSON describing adds, changes, and deletes like: (credit: csvdiff docs)

{
  "_index": [ "id" ],
  "added": [
    { "amount": "81", "id": "5", "name": "mira" },
    ...
  ],
  "changed": [
    {
      "fields": {
        "amount": { "from": "20", "to": "23" }
      },
      "key": ["1"]
    },
    ...
  ],
  "removed": [
    { "amount": "63", "id": "2", "name": "eva" },
    ...
  ]
}

This is a very detailed report and is great for parsing with tools like jq. However, we are really only interested in the adds and changes, and in particular the to value of the changes only. It was fast to create a prototype tool with csvdiff and it worked well on small partitions. However, it quickly exploded in memory utilization on larger partitions and frequently became a victim of the OOM killer.

Why? Probably because the internal data structures and output JSON format of csvdiff are mainly Python ones like List and Dict. When the instance count of structures like List and Dict scale with the data set, we incur increasing memory overhead for the growing number of Python objects.

Pandas

Pandas is a great library for working with indexed data, and manages memory utilization well. I ended up implementing my own CSV differ in Pandas, which allowed me to perform further memory-usage optimizations elaborated on further on.

To recap, values are roughly indexed in each backup by the signal name \(x\), time \(t\) and location \(s\) to give rise to values \(x_{s,t}\). Between the backups for times \(u-1\) and \(u\), we are only interested in added values (new times or locations) or changed values (back-filled values) in backup \(u\), which gives rise to issue values \(x_{s,t,u}\).

In order to just find the added and changed values, we can do an index-aligned data-frame comparison to select such rows with something like:

def pd_csvdiff(before_csv, after_csv, index_cols):
    # Load data and set indices
    df_before = pd.read_csv(before_csv, ...)
    df_after = pd.read_csv(after_csv, ...)
    df_before.set_index(index_cols, inplace=True)
    df_after.set_index(index_cols, inplace=True)
    ...
    # Align and compare
    same_mask = (df_before.reindex(df_after.index) == df_after)
    is_diff = ~(same_mask.all(axis=1))

    # Extract the different rows only
    return df_after.loc[is_diff, :]

Now why did we bother recreating this functionality in Pandas? Because Pandas provides good control over how we want to store the data in memory, in the form of column dtypes. Specifically, huge memory savings came from representing categorical columns as the Pandas categorical type instead of individual Python objects (strings in this case).

For example, this toy example shows ~60x less memory usage just by using dtype=category:

colors = np.random.choice(["red", "blue", "yellow"], 1000, replace=True)
s1 = pd.Series(colors)
s2 = pd.Series(colors, dtype="category")
print(f"Bytes used: {s1.memory_usage(deep=True)} B")
print(f"Bytes used: {s2.memory_usage(deep=True)} B")

>>> Bytes used: 613560 B
    Bytes used: 10392 B

When using categoricals, Pandas only has to store small integers for each entry, along with a mapping going from these small integers to the original object or string. Instead of storing many copies of duplicate strings, we only have to store the unique ones in this mapping. Thus we save on the number of Python objects we have in memory. This was especially useful for representing columns like signal name, state, county, etc.

Edge Conditions

First, missing values are present in the data as NaN values. However, a naive comparison of NaN == NaN is actually false-y by convention. In our case, this actually represents a missing value before that is still missing, so we do not want these to flag up as changed values.

Secondly, there could be a mis-match of category values. The category values are inferred from the individual CSVs, and new values may appear across time. Pandas has to know about the complete set of values when doing comparisons between categoricals, otherwise we are effectively comparing “different” sets. We have need to union the category values together before comparisons to prevent such errors.

The full implementation along with handling for these edge cases can be found here

Multiprocessing: Setup & Challenges

By now, I had nicely partitioned up the data and differencing within these partitions no longer caused memory utilization to explode. There were many partitions to churn through, but we were doing them one at a time with cores and memory to spare. I took the simple route of parallelizing this tool by using Python’s built-in multiprocessing module, specifically Pool.starmap, which very easily lets us map a function of multiple arguments across a list of arguments, in a parallel manner!

# Serially
splitted = starmap(split_csv_by_col, split_args)

# In parallel
with Pool(ncpu) as pool:
    splitted = pool.starmap(split_csv_by_col, split_args)

However, it turns out that processing some partitions required much more memory than others due to the nature of the signals. Having too large of a ncpu value would still result in OOM kills when too many of these large partitions got processed together.

The simple approach I took to fixing this was to more granular ncpu settings for each stage of the pipeline, having large ncpu for stages like CSV partitioning that used very little memory, and smaller ncpu counts for memory-intensive stages like CSV differencing.

This was probably not the most efficient way of tackling the issue, but I think it was an acceptable trade-off between being able to kick-start tool on the backups, and spending more time developing a better solution. In hindsight, I would have probably tried to implement a simple scheduler that estimated the memory needed for each partition from file sizes and then scheduled more small partitions together.

I also came across deadlock problems when trying using logging with multiprocessing, but managed to solve it with the multiprocessing_logging package. See here for more details on how I incorporated all these with the rest of the pipeline.

Conclusion

I learnt a lot about processing large data, optimizing for memory usage and multiprocessing in the development of this tool. It was made even harder as I only could test it on small sample back-ups and did not have access to the real server it would run on! I am glad it was able to churn through all the backups in the end to perform the migration, after several days of optimization and debugging.

I tried to include detailed documentation in the tool’s source code. It is my hope that if you are tackling a similar problem without some Hadoop or Spark cluster, that these reflections and documentation will come in useful.


Tagged #covid19, #memory, #data.