Merging Huuuge CSV Files Using Golang Channels

More often than not in engineering, the saying all you need is less holds true. The story I like to share today does certainly belong into that category. It is about a fearless SRE working in a fitness company who had the honoring task of migrating exercise data from a not-so-reliable database to a shiny, new microservice.

What in the good world is exercise data, you may ask? Legitimate question! Exercise data is generated by tens of thousands of strengths machines, organized in training circles, from gyms and fitness studios all over the world. These machines challenge our users, make them sweat, and help them reach their training goals. Awesome! But these machines also create a ton of data every time they are used:

  • Metadata, that is data identifying the machine, to which training circle it is assigned to, and a bunch of other relevant data points.
  • Strength Sets, that is data about the user’s performance and strength distribution over time while pushing or pulling a machine’s lever. Even when compressed and saved as protocol buffer this data lies in the two-digit kilobyte range per set.

An exercise can have one or more strengths sets, depending on the number of consecutive training intervals a user had on that machine before switching to another machine. All of this data was about to be migrated.

After the SRE team spent a questionable amount of engineering and felt responsible for a shockingly large share of the monthly cloud bill, the situation was as follows:

  • The most recent data has been fully migrated to the new service.
  • Remaining strength sets have been parked on Cloud Storage. Each strength set is stored in a protobuf file.
  • The expensive and unreliable database has been shut off for good. ๐Ÿ’”
  • About half a billion exercises from early days were still awaiting migration.
  • Metadata for these half a billion exercises have been extracted from a variety of data sources and consolidated in two CSV files.

The Data

Note: Data structures in this example are simplified, leaving out the majority of the columns, a sophisticated pipeline of data augmentation, and backend-specific optimizations. All the data presented here is sourced from pure imagination. ๐Ÿธ

Exercise metadata contains the exercise ID and some additional columns:

id user_id machine_id circle_id timestamp
23 1 100 7 1522032074
24 1 204 7 1522032317
25 4711 34 9 1522032161
26 90 70 9 1522012462
27 1 101 7 1522032728

The file metadata.csv looks like this:

id,user_id,machine_id,circle_id,timestamp
23,1,100,7,1522032074
24,1,204,7,1522032317
25,4711,34,99,1522032161
26,90,70,99,1522012462
27,1,101,7,1522032728

Let’s assume this file was a couple of gigabyte large. Just large enough so that it will not fit into memory of any reasonably priced compute instance.

The other file is strength_sets.csv. It holds the mapping from an exercise’s ID to one or more strength set IDs.

id strength_set_id
23 13402394
23 13402894
23 48402394
24 84023943
25 26382958
25 23801213
26 93208493
26 93204762
27 94800394

Remember, the strength sets are located in a bucket on Cloud Storage. They shall not be moved. A bucket is just the right place for them to live for now.

Although much smaller than the previous file, it is also large enough so that one would not want to squeeze it into memory. The file looks like this:

id,strength_set_id
23,13402394
23,13402894
23,48402394
24,84023943
25,26382958
25,23801213
26,93208493
26,93204762
27,94800394

The Goal

For the final migration of the exercises we want to create a file exercises.csv that should contain something like this:

id user_id machine_id circle_id timestamp strength_set_ids
23 1 100 7 1522032074 13402394 13402894 48402394
24 1 204 7 1522032317 84023943
25 4711 34 9 1522032161 26382958 23801213
26 90 70 9 1522012462 93208493 93204762
27 1 101 7 1522032728 94800394

Metadata and strength set IDs shall be merged so that an importer tool could easily ingest the exercises one at a time.

In pure CSV this would be:

id,user_id,machine_id,circle_id,timestamp,strength_set_ids
23,1,100,7,1522032074,13402394 13402894 48402394
24,1,204,7,1522032317,84023943
25,4711,34,99,1522032161,26382958 23801213
26,90,70,99,1522012462,93208493 93204762
27,1,101,7,1522032728,94800394

Approaches

There are plenty of ways to achieve this, but only a few are really paying respect to the tagline from earlier: All you need is less!

Not so great ideas:

  • We could load everything into memory and nest some for loops. Nah, too expensive. Also to easy ๐Ÿ˜‰
  • We could throw all the data into an SQL database and then run a fancy query to get exactly what we want. Hell no, this data was just painfully extracted from a complex SQL data scheme. We better keep a safe distance to SQL for now. Also, this would be too easy ๐Ÿคจ
  • We could iterate over metadata.csv and for every ID then iterate over all lines in strength_sets.csv and copy the set IDs we need over. Sorry, nope! That’s O(n*m) with n being the number of lines in one file and m being the number of lines of the other file. But the direction into this is heading is not that bad, actually!

A better idea:

  • Assuming all lines are sorted with respect to the primary identifier in, let’s say, ascending order, we have to iterate over every file only once.
  • For the metadata, we don’t need access to records that we have already processed. We will never look them up again. The same holds true for records that are still to be processed. We don’t need a line any sooner than the very moment we want to process that line. No lookups here, either.
  • The situation is similar for the strength sets. We only need access to a couple of them at a time, because if everything is sorted, we will find equal IDs next to each other. Once the ID changes, we know there are no further mappings for that exercise’s ID.
  • This solution has no lookups at all. Not even a single hashmap, usually an SRE’s favorite data structure. ๐Ÿ˜ฏ

Testing The Assumption

So far we have just assumed that the data sets are sorted. We should test this assumption. Of course, by writing a small program for that.

package main

import (
	"bufio"
	"log"
	"os"
	"strconv"
	"strings"
)

func main() {
	scanner := bufio.NewScanner(os.Stdin)
	lastID := -1
	for scanner.Scan() {
		columns := strings.Split(scanner.Text(), ",")
		id, err := strconv.Atoi(columns[0])
		if err != nil {
			log.Fatalf("ParseInt: %v", err)
		}
		if id <= lastID {
			log.Fatal("๐Ÿ˜ญ")
		}
		lastID = id
	}
	if err := scanner.Err(); err != nil {
		log.Fatal("scanner: %v", err)
	}
	log.Println("๐Ÿ‘")
}

Let’s run it, skipping the header line of the CSV:

$ tail -n +2 metadata.csv | go run main.go
2018/03/25 21:53:25 ๐Ÿ‘

Looking good!

For the strength sets, we have to modify the test. Here we are OK with duplicate IDs as long as they are consecutive. We change the conditional from id < lastID to id <= lastid. That’s all.

โœ‚๏ธ
		if id <= lastID {
			log.Fatal("๐Ÿ˜ญ")
		}
โœ‚๏ธ

Let’s run this one, too:

$ tail -n +2 strength_sets.csv | go run main.go
2018/03/25 21:56:22 ๐Ÿ‘

Great! Now we can implement our memory-saving approach.

The Implementation

The architecture of our program will look like this:

First, we define a struct that holds a single CSV line. We will be using a simplified format that threats the ID special but leaves everything else as it is.

type line struct {
	id         int
	restOfLine string
}

Since we will be reading lines from two different files, a little helper function that abstracts away and synchronizes the line intake comes in handy. Its job is it, to read lines, parse the ID, and send that to a channel for easier consumption by other functions. The function takes a filename and a channel as parameter.

func reader(fname string, out chan<- *line) {
	defer close(out) // close channel on return

	// open the file
	file, err := os.Open(fname)
	if err != nil {
		log.Fatalf("open: %v", err)
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)
	header := true
	for scanner.Scan() {
		var l line
		columns := strings.SplitN(scanner.Text(), ",", 2)
		// ignore first line (header)
		if header {
			header = false
			continue
		}
		// convert ID to integer for easier comparison
		id, err := strconv.Atoi(columns[0])
		if err != nil {
			log.Fatalf("ParseInt: %v", err)
		}
		l.id = id
		l.restOfLine = columns[1]
		// send the line to the channel
		out <- &l
	}
	if err := scanner.Err(); err != nil {
		log.Fatal(err)
	}
}

We read from stdin while testing our assumptions. Here we read from a file, but the rest is quite similar. We parse the first column into an Int to make it easier to match IDs from both input streams later. Pointers to the parsed lines will be sent to the outbound channel for further processing. Processing is done in another function, let’s call it joiner().

func joiner(metadata, setIDs <-chan *line, out chan<- *line) {
	defer close(out) // close channel on return

	si := &line{}
	for md := range metadata {
		sep := ","
		// add matching strength set (if left over from previous iteration)
		if si.id == md.id {
			md.restOfLine += sep + si.restOfLine
			sep = " "
		}
		// look for matching strength sets
		for si = range setIDs {
			// add all strength sets with matching IDs
			if si.id == md.id {
				md.restOfLine += sep + si.restOfLine
				sep = " "
			} else if si.id > md.id {
				break
			}
			// skip all strength sets with lower IDs
		}
		// send the augmented line into the channel
		out <- md
	}
}

Surprisingly, this function joins the matching data from our two input channels. To do so, it reads from the metadata channel to fetch a line with exercise metadata. Then we read from the strength set channel, skipping all lines that have a lower ID and appending all values from lines that have a matching ID. A pointer to the augmented line goes to the outbound channel.

In our main() function we fire up the workers and channels and print out the augmented lines.

func main() {
	// read metadata
	metadataChan := make(chan *line)
	go reader("metadata.csv", metadataChan)

	// read strength set IDs
	strengthSetChan := make(chan *line)
	go reader("strength_sets.csv", strengthSetChan)

	// join the two data streams
	augmentedLinesChan := make(chan *line)
	go joiner(metadataChan, strengthSetChan, augmentedLinesChan)

	// write computed lines
	fmt.Println("id,user_id,machine_id,circle_id,timestamp,strength_set_ids")
	for l := range augmentedLinesChan {
		fmt.Printf("%v,%v\n", l.id, l.restOfLine)
	}
}

And that’s all! The design of the program is agnostic to the size of the input files.

$ go run main.go
id,user_id,machine_id,circle_id,timestamp,strength_set_ids
23,1,100,7,1522032074,13402394 13402894 48402394
24,1,204,7,1522032317,84023943
25,4711,34,99,1522032161,26382958 23801213
26,90,70,99,1522012462,93208493 93204762
27,1,101,7,1522032728,94800394

I successfully ran this program to convert hundreds of gigabyte of exercises with as little as a couple of megabyte of memory consumption. ๐ŸŽ‰

Conclusion

Sometimes it is enough to evaluate a few simple assumptions to unlock a scaling approach. Once we are able to divide and conquer, things become easy very quickly.

Bonus: Try modifying the code to run concurrent joiner() goroutines to make use of all CPU cores (Hint: Modulus).