Use a Python 3 Apache Beam pipeline
You can do lots of things in SQL, and SQL is undeniably convenient, but every once in a while, you will find yourself needing to run Python code on your BigQuery tables. If your data is small, you can use Pandas (and the BigQuery client library), but if your data is large, the best approach is to use Apache Beam and execute it in a serverless, autoscaled way with Cloud Dataflow.
Here’s the full code for the example in GitHub. It comes from our forthcoming book on BigQuery.
Python 3 Apache Beam + BigQuery
Here’s the key Beam code to read from BigQuery and write to BigQuery:
with beam.Pipeline(RUNNER, options = opts) as p: (p | 'read_bq' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) | 'compute_fit' >> beam.FlatMap(compute_fit) | 'write_bq' >> beam.io.gcp.bigquery.WriteToBigQuery( 'ch05eu.station_stats', schema='station_id:string,ag:FLOAT64,bg:FLOAT64,cg:FLOAT64') )
Essentially, we are running a query on a BigQuery table, running the Python method compute_fit, and writing the output to a BigQuery table.
This is my compute_fit method. As you can see, it’s just plain Python code:
def compute_fit(row): from scipy import stats import numpy as np durations = row['duration_array'] ag, bg, cg = stats.gamma.fit(durations) if np.isfinite(ag) and np.isfinite(bg) and np.isfinite(cg): result = {} result['station_id'] = str(row['start_station_id']) result['ag'] = ag result['bg'] = bg result['cg'] = cg yield result
Make sure to specify the Python packages that you need installed on the Dataflow workers in a requirements.txt:
%%writefile requirements.txt numpy scipy
Enjoy!
No comments:
Post a Comment
Thanks for your comments