Steven Levine
Steven Levine
7 min read

Tags

TLDR - In this post I will walk through how to use BigQuery’s new capability of querying Hive Partitioned Parquet files in GCS. It is a really cool feature.

Some Background

I have a huge interest in Data Lakes, especially when it comes to the query engines that are capable of querying cloud object stores like Spark, Presto, Hive, Drill, among others. With that being said, when Google Cloud announced that BigQuery has beta support for querying Parquet and ORC files in Google Cloud Storage, it really peaked my curiosity. Thus, had no choice but to find a large dataset in Parquet format and try to query it with BigQuery. Sounds easy enough, right?

To get started, I needed to find a large Hive Partitioned Dataset to use. After some quick digging and searching, I wasn’t able to find one so the only logical thing to do was to create my own. One of my favorite features of BigQuery is the fact that it has tons of public datasets available to use for these sort of things. Being that I spend most of my days working in NYC, I have always found the NYC Taxi Data particularly interesting, so thought why not start there? For reference, the name of the dataset is bigquery-public-data:new_york_taxi_trips. This Dataset contains taxi rides partitioned by taxi company and year. For the purposes of this post, I will be using tlc_yellow_trips_2018 table because it is the most recent and has nearly 18GBs of raw data.

Creating Hive Partitioned Data in GCS using Spark and BigQuery

With an interesting table in mind, the next step is to create a Hive Partitioned version of it on Google Cloud Storage in the Parquet format. There are countless ways to handle this, again for the purposes of this post, I decided to use a simple Spark Shell script running on a Cloud DataProc cluster.

The first step is to spin up a Cloud DataProc cluster using the glcoud command line:

gcloud dataproc clusters create cluster-taxidata-extractor \
    --region us-central1 \
    --subnet default \
    --zone us-central1-a \
    --master-machine-type n1-standard-4 \
    --master-boot-disk-size 500 \
    --num-workers 2 \
    --worker-machine-type n1-standard-4 \
    --worker-boot-disk-size 500 \
    --image-version 1.3-deb9 \
    --project ${PROJET_ID}

The above command will spin up 3 node cluster each having 4 vCPUs and 15GBs of memory providing YARN with 8 cores and 24GBs of memory. This seems like more than enough horsepower for the task. Once the cluster is operational, ssh in with the following command:

gcloud compute ssh ${HOSTNAME} --project=${PROJECT_ID} --zone=${ZONE} 

Once in, need to spin up Spark Shell, but with one caveat, namely adding the spark-bigquery-connector to its environment. This is necessary in order to leverage the latest and greatest Big Query Storage APIs. The final command is:

spark-shell --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar 

Once in, it is easy to start exploring the data to figure out the best way to partition data. After a bit of review, I decided the most logical way to partition it is by ride date. There are two fields that can be used to achieve this, either pickup_datetime or dropoff_datetime. I decided to use pickup_datetime taking into account that some rides may start one day and end another day, I.e, 11.45p to 12.30a, these rides will be counted on the day they originated. There is one wrinkle in this decision, namely, the spark-bigquery-connector doesn’t have a native type to cast BigQuery’s DATETIME into so it simply casts it in to a STRING, which is not very useful as a partition key. Thus, custom code is needed to perform the cast from a DATETIME in to a DATE. The final code looks something like:

spark.read.format("bigquery")
    .option("table", "bigquery-public-data:new_york_taxi_trips.tlc_yellow_trips_2018")
    .load()
    .withColumn("trip_date", (col("pickup_datetime").cast("date")))
    .write.partitionBy("trip_date")
    .format("parquet")
    .save("gs://${PROJET_ID}/new-york-taxi-trips/yellow/2018")

The above code is pretty self explanatory. With that being said, let’s quickly walk through it. First, we load the table from BigQuery in to a DataFrame, next is the cast mentioned above, followed by partitioning information and file format, finally saving it. After hitting return, I went to grab a coffee. By the time I got back to my desk, I had 18GB of Taxi ride data partitioned by trip_date in my GCS bucket already. That was easy :smile:

For reference the files should look something like:

❯ gsutil ls gs://${PROJET_ID}/new-york-taxi-trips/yellow/2018
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/_SUCCESS
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/trip_date=2018-01-01/
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/trip_date=2018-01-01/UUID.snappy.parquet
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/trip_date=2018-01-01/UUID.snappy.parquet
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/trip_date=2018-01-02/
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/trip_date=2018-01-02/UUID.snappy.parquet
gs://${PROJECT_ID}/new-york-taxi-trips/yellow/2018/trip_date=2018-01-02/UUID.snappy.parquet
...

Pro Tip: This would be a great time to shut down the DataProc cluster since it is no longer needed.

Create an External Table in BigQuery

Now that we have a sample Hive Partitioned dataset in GCS to work with, let’s set it up as an external table in BigQuery. To get started, an external table definition needs to be created. Here is the command to create it:

bq mkdef \
    --source_format=PARQUET \
    --hive_partitioning_mode=AUTO \
    --hive_partitioning_source_uri_prefix=gs://{PROJECT_ID}/new-york-taxi-trips/yellow/2018/ \
    gs://new-york-taxi-trips/yellow/2018/*.parquet > taxi-table-def

taxi-table-def should look something like:

{
  "hivePartitioningOptions": {
    "mode": "AUTO",
    "sourceUriPrefix": "gs://{PROJET_ID}/new-york-taxi-trips/yellow/2018/"
  },
  "sourceFormat": "PARQUET",
  "sourceUris": [
    "gs://{PROJET_ID}/new-york-taxi-trips/yellow/2018/*.parquet"
  ]
}

With the definition file in hand, the next step is to create the external table in BigQuery. This can be accomplished with the following command:

bq mk --external_table_definition=taxi-table-def ${PROJET_ID}:nyc_taxi.2018_external

Note: This assumes you already have a Dataset named nyc_taxi, if you don’t, now would be a great time to create it, or change the above command to match the name of the Dataset you want to add the external table reference to.

After running the above, you should get a message saying Table '${PROJECT_ID}:nyc_taxi.2018_external' successfully created.

If the table was successfully created, it should also appear in the BigQuery UI as an external table available to query.

Query a BigQuery External Table

The final (and easiest) step is to query the Hive Partitioned Parquet files which requires nothing special at all. The query semantics for an external table are exactly the same as querying a normal table.

Let’s run a few queries to validate that things are working as they should.

SELECT count(*) FROM nyc_taxi.2018_external
-- Query complete (13.9 sec elapsed, 20.9 GB processed)
-- Result: 112234626 

SELECT count(*) FROM `bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2018`
-- Query complete (0.6 sec elapsed, 0 B processed)
-- Result: 112234626

Looks great! There are the same number of records in the external sink table as the original source table. The only difference between the two queries is the run time and the bytes processed, which is to be expected being that one is querying external Parquet files.

Now, lets see if BigQuery is able to decrease the amount of bytes processed if only querying a specific set of partitions, in this case the month of January:

SELECT count(*)
FROM nyc_taxi.2018_external
where trip_date between "2018-01-01" and "2018-01-31"
-- Query complete (9.1 sec elapsed, 1.6 GB processed)
-- 8760090

Again, looks great! Clearly, BigQuery was able to filter the bytes scanned dramatically (1.6 GB vs 20.9 GB). In fact if we look at the Execution Details:

BigQuery Execution Details

we can see that the query started with the full 8,760,090 rows, and narrowed down to 217. This number is not random, it maps directly to the number of files that exist in GCS. Lets verify:

❯ gsutil ls gs://${PROJET_ID}/new-york-taxi-trips/yellow/2018/trip_date=2018-01\* | grep parquet | wc
     217     217   27342

Perfect, the number of Parquet files under January 2018-01-* is 217.

One final thing to verify is if the number of bytes decrease based upon columns specified in the select, which would prove that BigQuery is not only taking advantage of the Hive Based Partitions, but also the columnar Parquet format.

SELECT pickup_datetime FROM nyc_taxi.2018_external
-- Query complete (16.0 sec elapsed, 2.2 GB processed)

Again, as expected, the number of bytes processed was narrowed down (2.2 GB vs 20.9 GB).

Another great feature of an external table is that you can join it with any other table, external or not, thus it makes querying external Parquet files seamless.

SELECT
  COUNT(*) AS total_trips,
  tr.pickup_location_id,
  geo.zone_name
FROM
  nyc_taxi.2018_external tr,
  `bigquery-public-data.new_york_taxi_trips`.taxi_zone_geom geo
WHERE
  tr.pickup_location_id = geo.zone_id
GROUP BY
  tr.pickup_location_id,
  geo.zone_name
ORDER BY
  total_trips DESC

This query aggregates the number of pickups by zone_id and then joins the id with the public Dataset table taxi_zone_geom.

Row total_trips pickup_location_id zone_name
1 4629205 237 Upper East Side South
2 4317981 161 Midtown Center
3 4203814 236 Upper East Side North
4 3944764 162 Midtown East
5 3821688 230 Times Sq/Theatre District

The most pickups in 2018 were in the Upper East Side South, who would have known? :man_shrugging:

Conclusion

Querying Hive Partitioned Parquet files directly from BigQuery is a very exciting and impressive new feature. The thing that I especially like about it is the fact that you can transparently query across external and regular tables without fuss. The number of use cases that come to mind are tremendous. I can’t wait to see how folks use it in their day to day data operations.