Startup hacks and engineering miracles from your exhausted friends at Faraday

How to export a Dataiku DSS (or any scikit-learn) model to PMML

Andy Rossmeissl on

This post is part of our data science series

At Faraday we use Dataiku to do ad hoc exploratory data science work, and especially for investigating new predictive techniques before building them into our platform.

Dataiku is awesome and has an incredibly responsive team. One drawback for me, however, has been Dataiku's lack of support for PMML, a standard serialization format for predictive models and their associated apparatus.

Luckily with a little hacking you can export a Dataiku model to PMML. And this technique can work anywhere you have a scikit-learn-based model you're trying to export.

Prerequisites

We're going to use Dataiku's built-in Python environment, which lives in your DSS data directory (generally /Users/username/Library/DataScienceStudio/dss_home on a Mac). We need to add a couple libraries first:

$ cd $DSS_DATA_DIR
$ ./bin/pip install sklearn_pandas
$ ./bin/pip install git+https://github.com/jpmml/sklearn2pmml.git

You'll also need a working JDK. If this doesn't work:

$ java -version
java version "1.8.0_121"  

Then install a JDK. (On Mac: brew cask install java.)

Locate your classifier

OK, now let's get our hands on the model you're trying to export. Maybe it's already in memory, but more likely it's pickled on disk. With Dataiku, you'll find your pickled classifier in a path that looks like this:

$DSS_DATA_DIR/analysis-data/PROJECTKEY/abcdefgh/ijklmnop/sessions/s1/pp1/m1

There it is, clf.pkl. It's helpful to copy this file into your working dir so we don't accidentally disturb it.

Export the model to PMML

Now let's start up an interactive Python console — again using Dataiku's built-in environment:

$ cd $DSS_DATA_DIR
$ ./bin/python
Python 2.7.10 (default, Oct 23 2015, 19:19:21)  
>>>

First let's load up some libraries:

>>> from sklearn.externals import joblib
>>> from sklearn2pmml import PMMLPipeline
>>> from sklearn2pmml import sklearn2pmml

Now we'll unmarshal the model using joblib, a pickle-compatible serialization library:

>>> clf = joblib.load('/path/to/clf.pkl')

Here's the only tricky part: we have to wrap the trained estimator in a Pipeline-like object that sklearn2pmml understands. (This is likely to get less tricky soon.)

>>> pipeline = PMMLPipeline([
...   ("estimator", clf)
... ])

And finally perform the export:

>>> sklearn2pmml(pipeline, "clf.pmml")
INFO: Parsing PKL..  
[snip]
INFO: Marshalled PMML in 714 ms.  

All done! The heavy lifting here is done by sklearn2pmml, which wraps the JPMML-SkLearn library. Thanks to Villu Ruusmann in particular for his help.

How we made our CSV processing 142x faster

Bill Morris on

This post is part of our data science hacks series

At Faraday, we've long used csvkit to understand, transform, and beat senseless our many streams of data. However, even this inimitable swiss army knife can be improved on - we've switched to xsv.

xsv is a fast CSV-parsing toolkit written in Rust that mostly matches the functionality of csvkit (including the clutch ability to pipe between modules), with a few extras tacked on (like smart sampling). Did I mention it's fast? In a standup comparison, I ran the "stats" module of XSV against "csvstat" from csvkit, on a 30k-line, 400-column CSV file:

  • Python-based csvkit chews through it in a respectable-and-now-expected 4m16s.

  • xsv takes 1.8 seconds. I don't even have time for a sip of my coffee.

The difference between csvkit and xsv is partly defined by scale; both tools are plenty fast on smaller datasets. But once you get into 10MB-and-upward range, xsv's processing speed pulls away exponentially.

If you've been using csvkit forever (like me), or if you want to be able to transform and analyze CSVs without loading them into a DB, give xsv a shot:

Install Rust

curl https://sh.rustup.rs -sSf | sh  

. . . which also gives you the rust package manager cargo, which lets you:

Install xsv

cargo install xsv  

Then be sure your PATH is configured correctly:

export PATH=~/.cargo/bin:$PATH  

. . . and try it out on a demo CSV with 10k rows, some messy strings, and multiple data types:

curl https://gist.githubusercontent.com/wboykinm/044e2af62fc0c7f77e17f6ccd55b8fb0/raw/fca391e6c03a06a7be770fefca6c47a9acdd2305/mock_data.csv \  
| xsv stats \
| xsv table

(xsv table formats the data so it's readable in the console):

field           type     sum                 min                  max                  min_length  max_length  mean                stddev  
id              Integer  5005000             1                    1000                 1           4           500.49999999999994  288.6749902572106  
first_name      Unicode                      Aaron                Willie               3           11  
last_name       Unicode                      Adams                Young                3           10  
email           Unicode                      aadamsp5@senate.gov  wwrightd8@upenn.edu  12          34  
gender          Unicode                      Female               Male                 4           6  
ip_address      Unicode                      0.111.40.87          99.50.37.244         9           15  
value           Unicode                      $1007.98             $999.37              0           8  
company         Unicode                      Abata                Zoovu                0           13  
lat             Float    243963.82509999987  -47.75034            69.70287             0           9           24.42080331331331   24.98767816017553  
lon             Float    443214.19009999954  -179.12198           170.29993            0           10          44.36578479479489   71.16647723898215  
messed_up_data  Unicode                      !@#$%^&*()           𠜎𠜱𠝹𠱓𠱸𠲖𠳏       0           393  
version         Unicode                      0.1.1                9.99                 3           14  

Happy parsing!

scrubcsv: clean CSVs, drop bad lines

Seamus Abshere on

This is part of our series on things that are obvious once you see them - and our data science series because it belongs in your toolchain.

Lies, damn lies, and commercial CSV export modules. Who wrote these things? On what planet would this be acceptable? Whatever.

Name,What's wrong  
"Robert "Bob" Smith",quotes inside quotes
Robert "Bob" Smith,quotes in the middle  
Robert Bob" Smith,unmatched quote  

Ruby dies immediately trying to read it:

$ irb
irb(main):001:0> require 'csv'  
=> true
irb(main):002:0> CSV.read('broken.csv')  
CSV::MalformedCSVError: Missing or stray quote in line 2  

Introducing scrubcsv, a is a lightning-fast static binary written in Rust that best-effort parses CSV and then immediately dumps back out 100% guaranteed standards-compliant CSV. Top speed? About 67mb/s.

$ scrubcsv broken.csv > fixed.csv
4 rows (0 bad) in 0.00 seconds, 787.13 KiB/sec

$ cat fixed.csv
Name,What's wrong  
"Robert Bob"" Smith""",quotes inside quotes
"Robert ""Bob"" Smith",quotes in the middle
"Robert Bob"" Smith",unmatched quote

It uses BurntSushi's world-beating CSV parser which is almost certainly faster than your SSD.

No MD5, SHA1, or SHA256 collisions for US addresses

Seamus Abshere on

I calculated hashes of single-family home addresses in the United States:

create table hashtest as (  
  select
    house_number_and_street,
    city,
    state,
    digest(upper(house_number_and_street || ',' || city || ',' || state), 'md5') as "md5",
    digest(upper(house_number_and_street || ',' || city || ',' || state), 'sha1') as "sha1",
    digest(upper(house_number_and_street || ',' || city || ',' || state), 'sha256') as "sha256"
  from households
)

E.g.,

=> select upper(house_number_and_street || ',' || city || ',' || state) "key", digest(upper(house_number_and_street || ',' || city || ',' || state), 'md5') "md5" from households limit 1;
             key               |                md5
-------------------------------+------------------------------------
 1024 PENINSULA DR,WESTWOOD,CA | \x511cdfb25d6b77d45742ed0407b5c2ef
(1 row)

Then I counted the distinct hashes:

=> select count(distinct md5) md5, count(distinct sha1) sha1, count(distinct sha256) sha256, count(*) from hashtest;
   md5    |   sha1   |  sha256  |  count
----------+----------+----------+----------
 78224992 | 78224992 | 78224992 | 81087108
(1 row)

Some of the addresses are repeated in the database because the APNs are identical, but the conclusion is that we have 78 million uniques and no hash collisions with the algorithms tested.

Open letter to Slack: Fitt's law and channel weeding

Seamus Abshere on

Update: They fixed this! Thanks so much!

I've got a beef with Slack and I back it up with Fitt's law:

This scientific law predicts that the time required to rapidly move to a target area is a function of the ratio between the distance to the target and the width of the target.

Here's the beef:

it's hard to choose what messages to delete

As a remote team manager, I try to weed our Slack channels by deleting bot explosions and repetitive error dumps.

  1. Obey Fitt's law and let me click anywhere on the message to select it.
  2. Let me specify a regex (e.g. Unknown account) that will mass-select messages for deletion.

Huzzah!

How to reverse geocode in bulk

Bill Morris on

This post is part of our practical cartography series.

We just rebuilt our Argo reverse-geocoding module as a proper command-line tool. Got a pile of coordinates in a table like this?

Pipe them through argo to get the context of an address assigned to each of them:

npm install argo-geo -g  
argo -i myfile.csv -a "blahblahmapzenauthtoken"  

Using Mapzen search, that'll churn through your table at 6 queries per second, appending results to each coordinate pair until it's done:

We built this to process millions of rooftop coordinates that a vendor provided to us without addresses, but you could just as easily use it for any position-only datasets:

  • Bird sightings from the field
  • Cars auto-extracted from imagery
  • GPS tracks from that pub crawl where you forgot the names of the bars
  • Mobile-collected reports of voter intimidation

We named it "Argo" to follow the Greek mythology pattern of Mapzen's geocoding engine "Pelias". Google and Mapbox each offer reverse-geocoding services as well, but those are just that: services. They include TOUs that restrict caching of the results, and man, did we want to cache these. The good folks at Mapzen built their search architecture on some truly amazing open datasets, and they match the spirit of the source by allowing storage and repurposing.

Thanks, Mapzen!

Rewinding time with rr & Rust to debug a terrible error message

Eric Kidd on

Yesterday morning, I was confronted with a truly terrible error message from a Rust program I was developing:

$ cage up --init
...
Error: error getting the project's state from Docker  
ExpectedError("Object", "null")  

The first part of that error message comes from cage (our open source tool for developing Docker apps with lots of microservices), which tries to be as informative as it can. The second part of the error comes from rustc-serialize, and it means I'm about to have a very bad day.

Let's track this down, fix the bug, and improve the error message as much as possible. Along the way, we'll see how to run the debugger backwards (and why you'd want to), and how to enforce informative error messages in Rust.

Preliminary investigation

Here, cage is talking directly to the Docker daemon, and it's trying to get information about the containers railshello_db_1 and railshello_web_1. You can do this manually using docker:

$ docker inspect railshello_db_1 railshello_web_1
[
    {
        "Id": "ab759d6b03d1c035ad4d6ef3b7b72427b6d2911b87be7734a8fee5c6f912fa54",
        "Created": "2016-10-25T11:57:05.845095715Z",
        "Path": "/docker-entrypoint.sh",
        "Args": [
            "postgres"
        ],
        "State": {
            "Status": "running",
            "Running": true,
            "Paused": false,
            "Restarting": false,
            "OOMKilled": false,

This continues for a total of 428 lines of JSON, most of it poorly documented. We're using rustc-serialize to automatically deserialize this JSON into Rust data structures which look like:

#[derive(Debug, Clone, RustcEncodable, RustcDecodable)]
#[allow(non_snake_case)]
pub struct ContainerInfo {  
    pub AppArmorProfile: String,
    pub Args: Vec<String>,
    pub Config: Config,
    pub Created: String,
    pub Driver: String,
    // ...
    pub Mounts: Vec<Mount>,
    pub Name: String,
    pub NetworkSettings: NetworkSettings,
    pub Path: String,
    pub ProcessLabel: String,
    pub ResolvConfPath: String,
    pub RestartCount: u64,
    pub State: State,
}

We have over 160 lines of this code, and ExpectedError("Object", "null") means that we have a null somewhere, but the corresponding field is demanding a full-fledged JSON Object. In other words, I wrote something like:

    pub Config: Config,

…that says we want an actual Config, when I should have wrapped that type in Option to specify that it's allowed to be null:

    pub Config: Option<Config>,

It's days like this that make me curious about how much startup capital is needed to produce artisanal Vermont cheese. I've met some former bankers who are really into that, and they seem happy.

Try #1: Transforming JSON with jq

Whenever something goes wrong with JSON, I immediately reach for jq. This allows me to drill down into JSON data structures and transform them using the command line:

$ docker inspect railshello_db_1 | jq '.[] | .Config'
{
  "Hostname": "ab759d6b03d1",
  "Domainname": "",
  "User": "",
  "AttachStdin": false,
  "AttachStdout": false,
  "AttachStderr": false,
  ...

After about 10 minutes with jq and grep, I can't find a suspicious null anywhere. Obviously, the bug is in right in front of me, but I'm not seeing it. This usually means that either I'm looking in the wrong place, or one of my assumptions is wrong. I need better data about what's going on, or else I'll waste hours speculating.

Try #2: Time-travelling debugging!

When logic and reason fail, it's time to reach for the debugger. We have a little test program that we can use, containing the following function:

fn find_all_exported_ports() -> Result<()> {  
    let docker = try!(Docker::connect_with_defaults());
    let containers = try!(docker.get_containers(false));
    for container in &containers {
        let info = try!(docker.get_container_info(&container));
        let ports: Vec<String> = info.NetworkSettings.Ports.keys()
            .cloned()
            .collect();
        println!("{}: {}", &info.Name, ports.join(", "));
    }
    Ok(())
}

Unfortunately, do you remember all those structs with declarations like this?

#[derive(Debug, Clone, RustcEncodable, RustcDecodable)]

Those declarations mean that we're going to be dealing with generated code that calls more generated code. And because Rust doesn't have exceptions, we can't tell it to "break on exception" and allow us to poke around.

This seems like a great opportunity to try out rr, which allows us to run programs backwards! Let's build our test program and record an execution trace using rr record:

$ cargo build --example findports
$ rr record target/debug/examples/findports

Now let's load that execution trace into the debugger using rr replay:

$ rr replay
GNU gdb (Ubuntu 7.11.1-0ubuntu1~16.04) 7.11.1  
...
0x00007fd531b82cc0 in _start () from /lib64/ld-linux-x86-64.so.2  
(rr)

From here, we can replay the crash using c (for "continue"):

(rr) c
Continuing.  
Error: ExpectedError("Object", "null")

Program received signal SIGKILL, Killed.  
0x0000000070000002 in ?? ()  

So now that we have a crash, let's work backwards to where it all went wrong. Ideally, we want the name of JSON field which contained the unexpected null. This will be named something like "Config".

So now we need to dig through rustc-serialize/src/json.rs and find out how it handles field names. After some poking around, this looks promising:

    fn read_struct_field<T, F>(&mut self,
                               name: &str,
                               _idx: usize,
                               f: F)
                               -> DecodeResult<T> where
        F: FnOnce(&mut Decoder) -> DecodeResult<T>,

So what we want to do is look at the last couple of read_struct_field calls before the program failed, find the value of name they received, and check that against our Rust data structures.

Setting break points on Rust functions is tricky, because they have huge compound names. Fortunately, we can use rbreak to set a breakpoint using a regular expression as follows:

(rr) rbreak rustc_serialize::json::.*read_struct_field.*

This produces three pages of output informing me it set 18 breakpoints on different versions of this function. This happens because the Rust compiler creates multiple copies of "generic" functions for performance.

And now for the fun part. Backwards execution! We use rc ("reverse continue") to run the program backwards until we hit a breakpoint:

(rr) rc
Continuing.

Breakpoint 17, rustc_serialize::json::{{impl}}::read_struct_field<std::collections::hash::map::HashMap<collections::string::String, std::collections::hash::map::HashMap<collections::string::String, collections::string::String, std::collections::hash::map::RandomState>, std::collections::hash::map::RandomState>,fn(&mut rustc_serialize::json::Decoder) -> core::result::Result<std::collections::hash::map::HashMap<collections::string::String, std::collections::hash::map::HashMap<collections::string::String, collections::string::String, std::collections::hash::map::RandomState>, std::collections::hash::map::RandomState>, rustc_serialize::json::DecoderError>> (self=0x7fff57e51128, name=...,  
    _idx=14, f=0x0)
    at /home/emk/.cargo/registry/src/github.com-1ecc6299db9ec823/rustc-serialize-0.3.19/src/json.rs:2227
2227        fn read_struct_field<T, F>(&mut self,  
(rr)

Now let's print out the name argument to read_struct_field:

(rr) set print pretty on
(rr) p name
$2 = {
  data_ptr = 0x5597aeec409f <str17130> "VolumesWorkingDirMountSourceDestinationModeRWPropagationBridgeEndpointIDGateway",
  length = 7
}

This is a raw Rust &str slice, pointing to 7 bytes in a longer string literal. This tells us we're looking at a field named Volumes. Some quick digging around reveals such a field in struct Config:

#[derive(Debug, Clone, RustcEncodable, RustcDecodable)]
#[allow(non_snake_case)]
pub struct Config {  
    pub AttachStderr: bool,
    pub AttachStdin: bool,
    pub AttachStdout: bool,
    // ...
    pub Volumes: HashMap<String, UnspecifiedObject>,
    pub WorkingDir: String,
}

This looks like it might be our culprit! We can change this to:

    pub Volumes: Option<HashMap<String, UnspecifiedObject>>,

…and re-run our program:

$ cargo run --example findports
   Compiling docker v0.0.41 (file:///home/emk/w/src/cage/rust-docker)
    Finished debug [unoptimized + debuginfo] target(s) in 9.72 secs
     Running `target/debug/examples/findports`
/railshello_web_1: 3000/tcp
/railshello_db_1: 5432/tcp

Yay! It works! But this won't be the last time we find this problem, so let's try to improve this error message a bit.

Chaining errors in Rust

We need to look at the code calling rustc_serialize::json::decode and see if we can provide a little bit more context for this error. The relevant code is in a fork of rust-docker and it uses the excellent error-chain library to handle errors:

error_chain! {  
    // Provide wrappers for other libraries' errors.
    foreign_links {
        env::VarError, EnvVar;
        hyper::Error, Hyper;
        io::Error, Io;
        rustc_serialize::json::DecoderError, Json;
    }

    // Define our own errors.
    errors {
        CouldNotConnect(host: String) {
            description("could not connect to Docker")
            display("could not connected to Docker at '{}'", &host)
        }

        // ...
    }
}

error_chain! is a giant macro that generates a bunch of error-handling code at compile time. The foreign_links section, in particular, allows us to automatically convert other libraries' errors into our own docker::Error type.

But rustc_serialize has horrible error messages, so we don't want to be allowed to automatically convert its errors into our own. We want Rust to force us to add useful information. So let's remove this line from the foreign_links block:

rustc_serialize::json::DecoderError, Json;  

…and add a new custom error type to the errors block:

ParseError(wanted: &'static str, input: String) {  
    description("error parsing JSON from Docker")
    display("error parsing JSON for {} from Docker", wanted)
}

Here, &'static str means wanted is a literal string slice, and String means that input is a regular owned string. We don't display input because it's huge, but we include it in the error so that our caller can log it if desired.

When re-run cargo test, we get the following errors:

error[E0277]: the trait bound `errors::Error: std::convert::From<rustc_serialize::json::DecoderError>` is not satisfied  
   --> src/docker.rs:346:20
    |
346 |         let info = try!(json::decode::<ContainerInfo>(&body));  
    |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ trait `errors::Error: std::convert::From<rustc_serialize::json::DecoderError>` not satisfied

This is telling us the we can longer automatically convert rustc_serialize::json::DecoderError into our own Error type. This is exactly what we wanted! Now we need to change:

let info = try!(json::decode::<ContainerInfo>(&body));  

…to:

let info = try!(json::decode::<ContainerInfo>(&body));  
    .chain_err(|| ErrorKind::ParseError("ContainerInfo", body));

This will wrap the json::DecoderError in our new ParseError type, giving us a slighly less awful error message:

Error: error parsing JSON for ContainerInfo from Docker  
ExpectedError("Object", "null")  

I should probably include a link to this blog post in that error message for the next person to hit one of these bugs! Unfortunately, we can't do much better without patching rustc-serialize.

If you maintain Rust code, please try to provide helpful, high-quality error messages with plenty of context. There are some great tools for doing this, but not all libraries use them.

How to use Postgres cursors and multi-insert in Ruby with Sequel

Seamus Abshere on

This post is part of our PostgreSQL series.

SQL cursors are a great way to stream data out of a database without loading it all into memory.

Reading using a cursor

Sequel has use_cursor:

BATCH_SIZE = 1000  
DB[:widgets].  
  select(:id, :name, :power).
  use_cursor(rows_per_fetch: BATCH_SIZE).each do |row|
    row[:id] # whatever
  end
end  

Writing using cursors and multi-insert

Here's a real-world example - we had to copy things from widgets to widgets_copy.

We thought we were being really clever, keeping all the data inside Postgres:

# NOTE: this is not the recommended method, but it works too
batches = (desired_count.to_f / BATCH_SIZE).ceil  
batches.times do |i|  
  offset = i * BATCH_SIZE
  DB.run <<-SQL
    INSERT INTO widgets_copy (
      id,
      name,
      power,
      created_at,
      updated_at
    ) (
      SELECT
        uuid_generate_v4(),
        name,
        power,
        now(),
        now(),
      FROM widgets
      ORDER BY id
      LIMIT #{BATCH_SIZE}
      OFFSET #{offset}
    )
  SQL
end  

Even with 244gb of RAM, this sometimes stopped dead 75% of the way through. We rewrote it—pulling the data into Ruby, even—and it's both faster and doesn't stop in the middle. We're using Sequel's multi_insert.

batch = []  
now = Sequel.function :now  
uuid_generate_v4 = Sequel.function :uuid_generate_v4  
DB[:widgets].  
  select(:name, :power).
  order(:id).
  use_cursor(rows_per_fetch: BATCH_SIZE).each do |row|
    batch << row.merge!(
      id: uuid_generate_v4,
      updated_at: now,
      created_at: now
    )
    if (batch.length % BATCH_SIZE == 0) || batch.length == desired_count
      DB[:widgets_copy].multi_insert batch
      batch.clear
    end
  end
end  

Thanks to @dkastner and @devTristan who promoted these ideas internally!

3 reasons Citus is the best (non-Heroku) Postgres host available today

Seamus Abshere on

Citus Cloud is the hosted Postgres that our app uses. We tried EnterpriseDB Postgres Plus Cloud and couldn't even get going; we used Amazon RDS for a year but migrated away about 6 months ago. Heroku Postgres wasn't bare-metal enough for us.

1. Log directly to Papertrail

Screenshot of logging setup It's not a world-shattering invention, but features like sending your Postgres logs to a syslog host is very useful for debugging.

Citus takes features requests and actually implements them (this was one of mine).

RDS has a weird home-grown in-browser log viewer which is useless without things like rds-pgbadger.

2. citus extension preinstalled

You know that feeling where you're so happy you can use SQL, but you worry just a bit 🙏🏽 about your horizontal scaling story? Citus is the company behind the citus extension to Postgres:

Citus parallelizes incoming queries by breaking it into multiple fragment queries which run on the worker shards in parallel [...] The workers are simply running extended PostgreSQL servers and they apply PostgreSQL’s standard planning and execution logic to run these fragment SQL queries. Therefore, any optimization that helps PostgreSQL also helps Citus.

(from Querying Distributed Tables)

Speaking of extensions, EnterpriseDB didn't even come with PostGIS the last time we checked; to get it you needed a Windows-based (!!) "StackBuilder" tool. Support was based in India and used an ancient Salesforce interface. You get the idea.

3. Everything is negotiable

Citus gave me a great price to move over from RDS.

You're not juggling Amazon on-demand and reserved instances, or forgetting your snapshot settings and paying $2000/mo for obsolete EBS snapshot storage (did that!), or being a tiny fish in the massive AWS pond.

You're not spending a month of negotiations on the phone with EnterpriseDB "execs," paying a huge up-front fee, and then seeing the project fail utterly when your top devops engineer couldn't install a basic Postgres extension.

This is a company with people like Craig Kerstiens (helped build Heroku Postgres) on the other end of the phone, live support chat, and a real value prop.

Saved by the compiler: Parallelizing a loop with Rust and rayon

Eric Kidd on

This post is part of our rust series.

The Rust compiler just saved me from a nasty threading bug. I was working on cage (our open source development tool for Docker apps with lots of microservices), and I decided to parallelize the routine that transformed docker-compose.yml files. This was mostly an excuse to check out the awesome rayon library, but it turned into a great example of what real-world Rust development is like.

The original routine looked something like this:

/// Process our pods, flattening and transforming them using our
/// plugins, and output them to the specified directory.
fn output_helper(&self, op: Operation, export_dir: &Path) -> Result<()> {  
    // Output each pod.
    for pod in &self.pods {
        // Don't export pods which aren't enabled.
        if !pod.enabled_in(&self.current_target) {
            continue;
        }

        // Figure out where to put our pod.
        // ...

        // Combine overrides, make it standalone, tweak as needed, and
        // output.
        let mut file = try!(pod.merged_file(&self.current_target));
        try!(file.make_standalone(&self.pods_dir()));
        let ctx = plugins::Context::new(self, pod);
        try!(self.plugins().transform(op, &ctx, &mut file));
        try!(file.write_to_path(out_path));
    }
    Ok(())
}

To convert this to a parallel loop, I started by changing:

for pod in &self.pods {  

To:

self.pods.par_iter().map(|pod| -> Result<()> {  

Here, Result<()> means "this closure might return an error, or it might return an empty tuple, basically void." (For more information on Rust error handling, check out the Rust book.) But it couldn't be that easy, could it?

Nope. The Rust compiler showed me the following error message. It's long, but I'll translate:

error[E0277]: the trait bound `plugins::PluginTransform + 'static: std::marker::Sync` is not satisfied  
   --> src/project.rs:397:30
    |
397 |         self.pods.par_iter().map(|pod| -> Result<()> {  
    |                              ^^^ trait `plugins::PluginTransform + 'static: std::marker::Sync` not satisfied
    |
    = note: `plugins::PluginTransform + 'static` cannot be shared between threads safely
    = note: required because it appears within the type `Box<plugins::PluginTransform + 'static>`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<Box<plugins::PluginTransform + 'static>>`
    = note: required because it appears within the type `alloc::raw_vec::RawVec<Box<plugins::PluginTransform + 'static>>`
    = note: required because it appears within the type `std::vec::Vec<Box<plugins::PluginTransform + 'static>>`
    = note: required because it appears within the type `plugins::Manager`
    = note: required because it appears within the type `std::option::Option<plugins::Manager>`
    = note: required because it appears within the type `project::Project`
    = note: required because it appears within the type `&project::Project`
    = note: required because it appears within the type `&&project::Project`
    = note: required because it appears within the type `[closure@src/project.rs:397:34: 424:10 op:&plugins::Operation, self:&&project::Project, export_dir:&&std::path::Path]`

The key bit to take away here is that "Sync is not satisfied" for the trait plugins::PluginTransform. In Rust, Sync is a special trait that tells the compiler that it's safe to share an object between two threads. That whole long list of "notes" afterwards looks intimidating, but it's really just telling me that my PluginTransform objects live inside my plugins::Manager type, which in turn lives inside my Project. The Rust compiler is verbose, but it tries to be helpful.

A bit of investigation reveals that PluginTransform inherits from the Plugin trait:

pub trait PluginTransform: Plugin {  

So we can fix this problem by changing the declaration of Plugin from:

pub trait Plugin {  

To:

pub trait Plugin: Sync {  

This says, "All types implementing Plugin must also allow me to access them from multiple threads." With that fixed, I re-run cargo test and get a similar message:

error[E0277]: the trait bound `plugins::transform::vault::GenerateToken + 'static: std::marker::Sync` is not satisfied  
   --> src/plugins/transform/vault.rs:230:6
    |
230 | impl plugins::Plugin for Plugin {  
    |      ^^^^^^^^^^^^^^^ trait `plugins::transform::vault::GenerateToken + 'static: std::marker::Sync` not satisfied
    |

This time, I change:

trait GenerateToken: Debug {  

To:

trait GenerateToken: Debug + Sync {  

This says, "All types which implement GenerateToken must also implement Debug (so I can print them), as well as Sync (so I can share them). We're making progress!

Re-running cargo test, however, reveals the actual bug, and it would have been a nightmare to debug:

error[E0277]: the trait bound `std::rc::Rc<std::cell::RefCell<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>: std::marker::Sync` is not satisfied  
   --> src/plugins/transform/vault.rs:124:6
    |
124 | impl GenerateToken for MockVault {  
    |      ^^^^^^^^^^^^^ trait `std::rc::Rc<std::cell::RefCell<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>: std::marker::Sync` not satisfied
    |

Uh-oh. In our test harness, we have a type MockVault, which contains an Rc<RefCell<_>>:

type MockVaultCalls = Rc<RefCell<Vec<(String, Vec<String>, VaultDuration)>>>;

/// A fake interface to vault for testing purposes.
#[derive(Debug)]
#[cfg(test)]
struct MockVault {  
    /// The tokens we were asked to generate.  We store these in a RefCell
    /// so that we can have "interior" mutability, because we don't want
    /// `generate_token` to be `&mut self` in the general case.
    calls: MockVaultCalls,
}

We use the MockVault to simulate a connection to Hashicorp's Vault, a secure central storage for passwords and other secrets, which issues time-limited credentials. And when we test our Vault code, we use the calls member here to record all the requests that we would have made to Vault.

The type Rc<RefCell<_>> is a hack. The GenerateToken API assumes that our token-generator is a read-only object. This is good, because we want to access it from multiple threads! But in the test code, we need to create some "interior" mutable state. Basically, we ask Rust to replace compile-time borrow checks with run-time borrow checks. (For more details, see the Rust book.) But Rc<RefCell<_>> is a lightweight mechanism designed for single-threaded code.

The fix is to replace Rc<RefCell<_>> with Arc<RwLock<_>>, which is fully thread-safe:

type MockVaultCalls = Arc<RwLock<Vec<(String, Vec<String>, VaultDuration)>>>;  

Once we make this change, Rust reminds us to change all the code that accesses calls, too:

error: no method named `borrow_mut` found for type `std::sync::Arc<std::sync::RwLock<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>` in the current scope  
   --> src/plugins/transform/vault.rs:132:20
    |
132 |         self.calls.borrow_mut().push((display_name.to_owned(), policies, ttl));  
    |                    ^^^^^^^^^^
    |
...
error: no method named `borrow` found for type `std::sync::Arc<std::sync::RwLock<std::vec::Vec<(std::string::String, std::vec::Vec<std::string::String>, vault::client::VaultDuration)>>>` in the current scope  
   --> src/plugins/transform/vault.rs:368:23
    |
368 |     let calls = calls.borrow();  
    |                       ^^^^^^
    |

Phew. If we hadn't caught that Rc<RefCell<_>>, our test suites might have randomly segfaulted a couple of times a week, and it would have taken us weeks to track it down. I don't even think our current test suites could trigger this bug, but I bet a future version would have been able to, leaving a nasty surprise for us someday. The Rust compiler dug down through layers of data structures and found the one bit that wasn't thread safe.

Wrapping it up

Rust also complains about that fact that we're calling continue from inside a closure:

error[E0267]: `continue` inside of a closure  
   --> src/project.rs:402:17
    |
402 |                 continue;  
    |                 ^^^^^^^^ cannot break inside of a closure

We can fix this by moving this code:

// Don't export pods which aren't enabled.
if !pod.enabled_in(&self.current_target) {  
    continue;
}

...into a filter call on our parallel iterator:

self.pods.par_iter()  
    // Don't export pods which aren't enabled.
    .filter(|pod| pod.enabled_in(&self.current_target))
    // Process each pod in parallel.
    .map(|pod| -> Result<()> {

The final problem is that we now have a parallel computation that returns a bunch of Result<()> objects, and we need to boil them down to a single Result<()> like we had before. If we have a bunch of error messages, I'm happy to just pick one. No need to overwhelm the user.

I spoke to Josh Stone, and he helped me come up with the following:

    .map(|pod| -> Result<()> {
        // ...
    }
    // If more than one parallel branch fails, just return one error.
    .reduce_with(|result1, result2| result1.and(result2))
    .unwrap_or(Ok(()))

This says, "Given any two results named result1 and result2, take result1 if it's an error, otherwise take result2." It's basically a short-circuit && operator, but for Result values instead of booleans. I've filed an issue suggesting that rayon should provide those last two lines as a built-in function.

And yes, that's a genuine parallel map-reduce in Rust!

So here's our final loop, using work-stealing parallelism:

fn output_helper(&self, op: Operation, export_dir: &Path) -> Result<()> {  
    // Output each pod.  This isn't especially slow (except maybe the
    // Vault plugin), but parallelizing things is easy.
    self.pods.par_iter()

        // Don't export pods which aren't enabled.
        .filter(|pod| pod.enabled_in(&self.current_target))

        // Process each pod in parallel.
        .map(|pod| -> Result<()> {
            // Figure out where to put our pod.
            // ...

            // Combine overrides, make it standalone, tweak as needed, and
            // output.
            let mut file = try!(pod.merged_file(&self.current_target));
            try!(file.make_standalone(&self.pods_dir()));
            let ctx = plugins::Context::new(self, pod);
            try!(self.plugins().transform(op, &ctx, &mut file));
            try!(file.write_to_path(out_path));
            Ok(())
        })

        // If more than one parallel branch fails, just return one error.
        .reduce_with(|result1, result2| result1.and(result2))
        .unwrap_or(Ok(()))
}

This was pretty painless! I have a lot more things to parallelize, of course. But this is what day-to-day Rust development is like: I have to do a bit of extra work to satisfy the compiler (which mostly becomes a reflex). But in turn, the compiler ferrets out all kinds of subtle concurrency errors and generally watches my back. It's an interesting tradeoff, and overall I like it.