Merging Huuuge CSV Files Using Golang Channels

More often than not in engineering, the saying all you need is less holds true. The story I like to share today does certainly belong into that category. It is about a fearless SRE working in a fitness company who had the honoring task of migrating exercise data from a not-so-reliable database to a shiny, new microservice.

What in the good world is exercise data, you may ask? Legitimate question! Exercise data is generated by tens of thousands of strengths machines, organized in training circles, from gyms and fitness studios all over the world. These machines challenge our users, make them sweat, and help them reach their training goals. Awesome! But these machines also create a ton of data every time they are used:

  • Metadata, that is data identifying the machine, to which training circle it is assigned to, and a bunch of other relevant data points.
  • Strength Sets, that is data about the user’s performance and strength distribution over time while pushing or pulling a machine’s lever. Even when compressed and saved as protocol buffer this data lies in the two-digit kilobyte range per set.

An exercise can have one or more strengths sets, depending on the number of consecutive training intervals a user had on that machine before switching to another machine. All of this data was about to be migrated.

After the SRE team spent a questionable amount of engineering and felt responsible for a shockingly large share of the monthly cloud bill, the situation was as follows:

  • The most recent data has been fully migrated to the new service.
  • Remaining strength sets have been parked on Cloud Storage. Each strength set is stored in a protobuf file.
  • The expensive and unreliable database has been shut off for good. ๐Ÿ’”
  • About half a billion exercises from early days were still awaiting migration.
  • Metadata for these half a billion exercises have been extracted from a variety of data sources and consolidated in two CSV files.

The Data

Note: Data structures in this example are simplified, leaving out the majority of the columns, a sophisticated pipeline of data augmentation, and backend-specific optimizations. All the data presented here is sourced from pure imagination. ๐Ÿธ

Exercise metadata contains the exercise ID and some additional columns:

id user_id machine_id circle_id timestamp
23 1 100 7 1522032074
24 1 204 7 1522032317
25 4711 34 9 1522032161
26 90 70 9 1522012462
27 1 101 7 1522032728

The file metadata.csv looks like this:

id,user_id,machine_id,circle_id,timestamp
23,1,100,7,1522032074
24,1,204,7,1522032317
25,4711,34,99,1522032161
26,90,70,99,1522012462
27,1,101,7,1522032728

Let’s assume this file was a couple of gigabyte large. Just large enough so that it will not fit into memory of any reasonably priced compute instance.

The other file is strength_sets.csv. It holds the mapping from an exercise’s ID to one or more strength set IDs.

id strength_set_id
23 13402394
23 13402894
23 48402394
24 84023943
25 26382958
25 23801213
26 93208493
26 93204762
27 94800394

Remember, the strength sets are located in a bucket on Cloud Storage. They shall not be moved. A bucket is just the right place for them to live for now.

Although much smaller than the previous file, it is also large enough so that one would not want to squeeze it into memory. The file looks like this:

id,strength_set_id
23,13402394
23,13402894
23,48402394
24,84023943
25,26382958
25,23801213
26,93208493
26,93204762
27,94800394

The Goal

For the final migration of the exercises we want to create a file exercises.csv that should contain something like this:

id user_id machine_id circle_id timestamp strength_set_ids
23 1 100 7 1522032074 13402394 13402894 48402394
24 1 204 7 1522032317 84023943
25 4711 34 9 1522032161 26382958 23801213
26 90 70 9 1522012462 93208493 93204762
27 1 101 7 1522032728 94800394

Metadata and strength set IDs shall be merged so that an importer tool could easily ingest the exercises one at a time.

In pure CSV this would be:

id,user_id,machine_id,circle_id,timestamp,strength_set_ids
23,1,100,7,1522032074,13402394 13402894 48402394
24,1,204,7,1522032317,84023943
25,4711,34,99,1522032161,26382958 23801213
26,90,70,99,1522012462,93208493 93204762
27,1,101,7,1522032728,94800394

Approaches

There are plenty of ways to achieve this, but only a few are really paying respect to the tagline from earlier: All you need is less!

Not so great ideas:

  • We could load everything into memory and nest some for loops. Nah, too expensive. Also to easy ๐Ÿ˜‰
  • We could throw all the data into an SQL database and then run a fancy query to get exactly what we want. Hell no, this data was just painfully extracted from a complex SQL data scheme. We better keep a safe distance to SQL for now. Also, this would be too easy ๐Ÿคจ
  • We could iterate over metadata.csv and for every ID then iterate over all lines in strength_sets.csv and copy the set IDs we need over. Sorry, nope! That’s O(n*m) with n being the number of lines in one file and m being the number of lines of the other file. But the direction into this is heading is not that bad, actually!

A better idea:

  • Assuming all lines are sorted with respect to the primary identifier in, let’s say, ascending order, we have to iterate over every file only once.
  • For the metadata, we don’t need access to records that we have already processed. We will never look them up again. The same holds true for records that are still to be processed. We don’t need a line any sooner than the very moment we want to process that line. No lookups here, either.
  • The situation is similar for the strength sets. We only need access to a couple of them at a time, because if everything is sorted, we will find equal IDs next to each other. Once the ID changes, we know there are no further mappings for that exercise’s ID.
  • This solution has no lookups at all. Not even a single hashmap, usually an SRE’s favorite data structure. ๐Ÿ˜ฏ

Testing The Assumption

So far we have just assumed that the data sets are sorted. We should test this assumption. Of course, by writing a small program for that.

package main

import (
	"bufio"
	"log"
	"os"
	"strconv"
	"strings"
)

func main() {
	scanner := bufio.NewScanner(os.Stdin)
	lastID := -1
	for scanner.Scan() {
		columns := strings.Split(scanner.Text(), ",")
		id, err := strconv.Atoi(columns[0])
		if err != nil {
			log.Fatalf("ParseInt: %v", err)
		}
		if id <= lastID {
			log.Fatal("๐Ÿ˜ญ")
		}
		lastID = id
	}
	if err := scanner.Err(); err != nil {
		log.Fatal("scanner: %v", err)
	}
	log.Println("๐Ÿ‘")
}

Let’s run it, skipping the header line of the CSV:

$ tail -n +2 metadata.csv | go run main.go
2018/03/25 21:53:25 ๐Ÿ‘

Looking good!

For the strength sets, we have to modify the test. Here we are OK with duplicate IDs as long as they are consecutive. We change the conditional from id < lastID to id <= lastid. That’s all.

โœ‚๏ธ
		if id <= lastID {
			log.Fatal("๐Ÿ˜ญ")
		}
โœ‚๏ธ

Let’s run this one, too:

$ tail -n +2 strength_sets.csv | go run main.go
2018/03/25 21:56:22 ๐Ÿ‘

Great! Now we can implement our memory-saving approach.

The Implementation

The architecture of our program will look like this:

First, we define a struct that holds a single CSV line. We will be using a simplified format that threats the ID special but leaves everything else as it is.

type line struct {
	id         int
	restOfLine string
}

Since we will be reading lines from two different files, a little helper function that abstracts away and synchronizes the line intake comes in handy. Its job is it, to read lines, parse the ID, and send that to a channel for easier consumption by other functions. The function takes a filename and a channel as parameter.

func reader(fname string, out chan<- *line) {
	defer close(out) // close channel on return

	// open the file
	file, err := os.Open(fname)
	if err != nil {
		log.Fatalf("open: %v", err)
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)
	header := true
	for scanner.Scan() {
		var l line
		columns := strings.SplitN(scanner.Text(), ",", 2)
		// ignore first line (header)
		if header {
			header = false
			continue
		}
		// convert ID to integer for easier comparison
		id, err := strconv.Atoi(columns[0])
		if err != nil {
			log.Fatalf("ParseInt: %v", err)
		}
		l.id = id
		l.restOfLine = columns[1]
		// send the line to the channel
		out <- &l
	}
	if err := scanner.Err(); err != nil {
		log.Fatal(err)
	}
}

We read from stdin while testing our assumptions. Here we read from a file, but the rest is quite similar. We parse the first column into an Int to make it easier to match IDs from both input streams later. Pointers to the parsed lines will be sent to the outbound channel for further processing. Processing is done in another function, let’s call it joiner().

func joiner(metadata, setIDs <-chan *line, out chan<- *line) {
	defer close(out) // close channel on return

	si := &line{}
	for md := range metadata {
		sep := ","
		// add matching strength set (if left over from previous iteration)
		if si.id == md.id {
			md.restOfLine += sep + si.restOfLine
			sep = " "
		}
		// look for matching strength sets
		for si = range setIDs {
			// add all strength sets with matching IDs
			if si.id == md.id {
				md.restOfLine += sep + si.restOfLine
				sep = " "
			} else if si.id > md.id {
				break
			}
			// skip all strength sets with lower IDs
		}
		// send the augmented line into the channel
		out <- md
	}
}

Surprisingly, this function joins the matching data from our two input channels. To do so, it reads from the metadata channel to fetch a line with exercise metadata. Then we read from the strength set channel, skipping all lines that have a lower ID and appending all values from lines that have a matching ID. A pointer to the augmented line goes to the outbound channel.

In our main() function we fire up the workers and channels and print out the augmented lines.

func main() {
	// read metadata
	metadataChan := make(chan *line)
	go reader("metadata.csv", metadataChan)

	// read strength set IDs
	strengthSetChan := make(chan *line)
	go reader("strength_sets.csv", strengthSetChan)

	// join the two data streams
	augmentedLinesChan := make(chan *line)
	go joiner(metadataChan, strengthSetChan, augmentedLinesChan)

	// write computed lines
	fmt.Println("id,user_id,machine_id,circle_id,timestamp,strength_set_ids")
	for l := range augmentedLinesChan {
		fmt.Printf("%v,%v\n", l.id, l.restOfLine)
	}
}

And that’s all! The design of the program is agnostic to the size of the input files.

$ go run main.go
id,user_id,machine_id,circle_id,timestamp,strength_set_ids
23,1,100,7,1522032074,13402394 13402894 48402394
24,1,204,7,1522032317,84023943
25,4711,34,99,1522032161,26382958 23801213
26,90,70,99,1522012462,93208493 93204762
27,1,101,7,1522032728,94800394

I successfully ran this program to convert hundreds of gigabyte of exercises with as little as a couple of megabyte of memory consumption. ๐ŸŽ‰

Conclusion

Sometimes it is enough to evaluate a few simple assumptions to unlock a scaling approach. Once we are able to divide and conquer, things become easy very quickly.

Bonus: Try modifying the code to run concurrent joiner() goroutines to make use of all CPU cores (Hint: Modulus).

How to score The Daily Show tickets using Twilio and Golang

I’m a big fan of Comedy Central’s The Daily Show previously hosted by John Stuart who handed over to Trevor Noah in 2015. The show is recorded in New York City in front of a live studio audience. Tickets to the show are free but limited. And therefore they are hard to get. There are two types of tickets:

  • GENERAL GUARANTEED tickets which guarantee a seat in the audience if you arrive in time, and
  • GENERAL - ENTRY NOT GUARANTEED tickets that might get you in when you wait in line next to the studio. People holding this ticket type may be invited to fill in remaining seats.

On the ticket website is a little calendar in the top right corner. Dates on which the show will be airing become available in chunks from time to time. Over the short period of time I monitored this I was not able to find a reoccurring pattern from which I could predict future ticket rounds. If I wanted to get lucky with a ticket on a specific date I would have to regularly check the ticket website. I believe humans should delegate repetitive tasks like this one to a computer.

A small program would do the job of

  • checking the website regularly for ticket availability at specific dates and
  • notifying me once tickets are available.

Checking the Dates

To get an idea of how the website was structured and how I could detect changes in the availability of tickets I took a look a the source code. ๐Ÿ‘€ The site is structured mostly using HTML.

The calendar, however, is rendered via JavaScript.

var dates_avail = {
  "2018-02-20":"style1",
  "2018-02-21":"style1",
  "2018-02-22":"style1",
  โœ‚๏ธ
  "2018-03-28":"style1",
  "2018-03-29":"style1"
};

Luckily, the available dates are stored in an object in the source. Furthermore, to find a particular date I simply needed to search for it in the right notation, for example "2018-02-20":"style1". That string was not matching anything else but the ticket calendar if, and only if, tickets were available on that date. That was so much easier than I thought!

So the checking part of my program was down to simply fetching the website’s source and then running a substring search over the received data.

My weapon language of choice, as usual: Golang. I used the http package to fetch the website’s source:

ticketWebsiteURL := "https://www.showclix.com/event/TheDailyShowwithTrevorNoah"
response, err := http.Get(ticketWebsiteURL)
if err != nil {
  โœ‚๏ธ
}
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
  โœ‚๏ธ
}

For the substring search, I started with first defining the dates at which I would be in New York and have enough spare time to make it to the studio. After all, there is no use blocking a seat that other would be happy to have.

triggerDates := []string{"2018-02-20", "2018-02-21"}

Ranging over the triggerDates I defined the search pattern for each date individually and then used the Contains() function of the strings package.

for _, td := range triggerDates {
  search := "\"" + td + "\":\"style1\""
  if strings.Contains(string(contents), search) {
    // Found it! Let's send a notification.
    โœ‚๏ธ
  }
}
response.Body.Close()

That was the easier part.

Notification via SMS

I spend most of my life in the Central European Time (CET) timezone but the show is recorded in the Eastern Standard Time (EST) timezone. I assumed the tickets would, therefore, become available sometime during EST office hours, at which I might be asleep.

I needed a notification method that would safely wake me up but not interfere with the quiet hours settings of my phone. I could have used the Pagerduty API but somehow I found this to be overkill. Overkill, because I have an alert escalation configuration at Pagerduty that truly wakes me up. And my girlfriend, much to her excitement. #not

A communication channel that has been almost forgotten is the good old Short Messaging Service (SMS). I get so few SMS messages, that receiving one can hardly disturb my usual flow. With SMS to the rescue, I configured my phone to notify for SMS even during quiet hours. Now I only need to make my program send SMS. That may sound tricky, but in the age of Cloud, this is just a web service away. I headed over to Twilio to register a mobile phone number that I would use as message source.

Here is how I crafted the message in Go:

msgData := url.Values{}
msgData.Set("To", "+00-MY-PHONE-NUMBER")
msgData.Set("From", "+00-MY-TWILIO-NUMBER")
msgData.Set("Body",
  "Tickets available now ("+info+")! Visit "+ticketWebsiteURL)

Sending the text was then as easy as creating a REST call:

req, _ := http.NewRequest("POST", apiURL, &msgDataReader)
req.SetBasicAuth(accountSid, authToken)
req.Header.Add("Accept", "application/json")
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

There are a few more lines to making a successful call to the Twilio API, of course. Scroll down for the full source.

Wait for it… Win!

With fetching and notifications being set up I just had to run the code. For this I copied it over to my jump host, a compute instance that is running 247 anyway and could use some additional load. ๐Ÿ˜‰

04:35:33 waiting...
04:35:33 fetching...
04:50:34 waiting...
05:05:34 fetching...
05:05:35 sms sent!
exit status 1

One day, right after my early workout, the long-awaited for text arrived. I got lucky and scored two tickets for one of the dates I wanted. Hooray!

Greetings from The Daily Show with Trevor Noah everyone!

Source Code

Here’s the full source code FYI. Enjoy the show! ๐ŸŽฅ

package main

import (
  "encoding/json"
  "io/ioutil"
  "log"
  "net/http"
  "net/url"
  "strings"
  "time"
)

var (
  triggerDates     = []string{"2018-02-20", "2018-02-21"}
  ticketWebsiteURL = "https://www.showclix.com/event/TheDailyShowwithTrevorNoah"
)

func sendSMS(info string) {
  accountSid := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  authToken := "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"
  apiURL := "https://api.twilio.com/2010-04-01/Accounts/" +
    accountSid + "/Messages.json"

  msgData := url.Values{}
  msgData.Set("To", "+00-YOUR-NUMBER")
  msgData.Set("From", "+00-YOUR-TWILIO-NUMBER")
  msgData.Set("Body",
    "Tickets available now ("+info+")! Visit "+ticketWebsiteURL)
  msgDataReader := *strings.NewReader(msgData.Encode())

  client := &http.Client{}
  req, _ := http.NewRequest("POST", apiURL, &msgDataReader)
  req.SetBasicAuth(accountSid, authToken)
  req.Header.Add("Accept", "application/json")
  req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

  resp, _ := client.Do(req)
  // Let's be very generous with the status code. We want those tickets!
  if resp.StatusCode >= 200 && resp.StatusCode < 300 {
    var data map[string]interface{}
    decoder := json.NewDecoder(resp.Body)
    err := decoder.Decode(&data)
    if err == nil {
      log.Printf("twilio: decode json: %v", err)
    }
  } else {
    log.Printf("twilio: status code: %v", resp.Status)
  }
}

func main() {
  for {
    // Behave! Wait 15 minutes to not overload the site or spam their logs.
    log.Printf("waiting...\n")
    time.Sleep(15 * time.Minute)

    // Fetch the website.
    log.Printf("fetching...\n")
    response, err := http.Get(ticketWebsiteURL)
    if err != nil {
      log.Printf("get: %v", err)
      continue
    }
    contents, err := ioutil.ReadAll(response.Body)
    if err != nil {
      log.Printf("read body: %v", err)
      continue
    }
    // Look for the trigger dates in the source code.s
    for _, td := range triggerDates {
      search := "\"" + td + "\":\"style1\""
      if strings.Contains(string(contents), search) {
        // Found it! Let's send a notification.
        sendSMS(td)
        log.Fatalf("sms sent!")
      }
    }
    response.Body.Close()
  }
}

My Little Helper: Slack Bot

As a Site Reliability Engineer (SRE) I spend a significant amount of my time on the Linux console. Furthermore, I spend some time writing software and tooling. But I also spend a lot of time on Slack (substitute with your organizations preferred chat platform) communicating with humans.1

Bridging these domains often requires copying and pasting of information including sometimes reformatting. At one point, I was so annoyed by moving console output from a terminal window to Slack that I decided to find a better way.

My idea was to have a single, statically linked binary that I can scp to a system and that would run without further actions. The job of that binary helper would be to post to Slack on my behalf.

To honor the great inventor (and actress) Hedy Lamarr my little helper was named after her.

Great, we have a problem, a solution, and the name hedybot. But what are actual uses cases in a world (striving for) full automation? Actually, there are still a lot of manual tasks left, including:

  • Tasks that require human oversight to avoid disaster
  • One-time but often long-running tasks

Manual Deployment Notifications

One example of a task that requires human oversight at my workplace is the deployment of Domain Name System (DNS) changes. Since a mistake here can easily cost thousands of dollars and unmeasurable loss of customer trust, we tend to have an experienced engineer deploy the changes. For additional assurance, we always post the deployed changes to Slack for everyone to read. People double check and sometimes ask questions about the changes. That is a wonderful use case for hedybot! Here it is in action, using dns-tools:

$ rrpush --quiet --dry-run=false --delay=0 --no-color 2>&1 \
  | hedybot --channel=FOO2342 --title="Deployment on Production DNS"

In Slack it looks like this.

small

By the way, the color follows some loose internal convention and is hardcoded. It is a potential improvement to make the color configurable via command line flag.

Long-running Jobs

Another great use case for hedybot is a long-running job. Let’s assume there is a server that we need to wipe to comply with regulations. One could easily lose track of such a task once it is started. Daily business and occasional firefighting push less urgent matters aside and soon our brain has forgotten about them. This is where a little helper comes in handy by posting a quick message:

$ dd if=/dev/urandom of=/dev/sdx bs=4096; \
  echo "disk erase finished" | hedybot --title="Example Server"

The resulting message is clear and simple:

small

Thanks to the timely reminder, we can decommission the server right away and save some money here.

Hedybot Source Code

Here is the Golang code that I used. Grab it to craft your own little helper.

package main

import (
  "flag"
  "io/ioutil"
  "log"
  "os"

  "github.com/nlopes/slack"
)

const (
  // fetch API key from your slack workspace
  apiKey = "xxxx-xxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxx"
)

func main() {
  channelID := flag.String("channel-id", "C85PT1ULR",
    "ID of the channel to post too")
  title := flag.String("title", "Message",
    "Title for the message to post")
  flag.Parse()

  bytes, err := ioutil.ReadAll(os.Stdin)
  if err != nil {
    log.Fatalf("read stdin: %v", err)
  }
  if len(bytes) < 1 {
    log.Fatalf("stdin is empty")
  }
  report := string(bytes)

  params := slack.PostMessageParameters{
    AsUser: true,
    Attachments: []slack.Attachment{
      {
        Color: "#FFA500",
        Fields: []slack.AttachmentField{
          {
            Title: *title,
            Value: report,
          },
        },
      },
    },
  }
  api := slack.New(apiKey)
  _, _, err = api.PostMessage(*channelID, "", params)
  if err != nil {
    log.Fatalf("post report: %v", err)
  }
}
  1. The tricky part my job is to figure out which activity is worth automating, which activity requires time boxing, and when going deep into the details is advised.