DocumentationFundamentals

GCP BigQuery

How to send data to BigQuery from Webhook Relay.

Prerequisites:

Google Cloud Platform account (free trial available)

Google Cloud project with BigQuery enabled (there’s a generous free tier available for BigQuery)

Dataset and table in BigQuery - https://cloud.google.com/bigquery/docs/tables

Webhook Relay provides a helper package bigquery that can stream writes into Google Cloud BigQuery. To start ingesting data from webhooks straight into your BigQuery table, create a new Function and just import ‘bigquery’ package:

-- Import BigQuery helper package
local bigquery = require('bigquery')

A new tab should appear that will ask you to set up credentials:

Go to that tab and it will ask you to:

  1. Create new service accounts with BigQuery Editor permissions
  2. Download the JSON file. Once you have the JSON file
  3. Copy & paste contents into the form and click save.

Streaming data into BigQuery

-- Import BigQuery helper package
local bigquery = require('bigquery')
local json = require("json")

-- Parsing payload
local rowData, err = json.decode(r.RequestBody)
if err then error(err) end

-- Initializing BigQuery client
err = bigquery.initialize('your-project-id', 'dataset-id', 'table-id')
if err then error(err) end

-- Receiving payload:
-- {
--     "hub_user_id": "user-id-here",
--     "category": "signup",
--     "action": "click",
--     "label": "github auth"
-- }

-- Insert row:
err = bigquery.insert(rowData)
if err then error(err) end

Check if record exists

A simple query to check whether a row exists by matching a column with a value:

bigquery = require('bigquery')
err =   bigquery.initialize('your-project-id', 'dataset-id', 'table-id')
if err then error(err) end

local exists, err = bigquery.record_exists('name', 'john')
if err then error(err) end

if exists then
  -- OK
else
  error('Record not found')
end

Use cases:

  • You are working with a webhook that sends data about a user signing up. You want to check if the user already exists in your database before inserting a new row.
  • If each inserted unique webhook results in an expensive operation you want to avoid running the operation if the row already exists.

Execute any command

To execute any SQL command on your table:

bigquery = require('bigquery')
err =   bigquery.initialize('your-project-id', 'dataset-id', 'table-id')
if err then error(err) end

-- Delete old records of the matching category. Method 'exec' can take an arbitrary 
-- number of arguments, depending on how many ? you have in your query.
err = bigquery.exec('DELETE dataset-id.table-id WHERE category = ? AND country = ?', 'movies', 'US')
if err then error(err) end

BigQuery package API reference

At the moment there’s a single client method that bigquery package exposes:

Method nameParameter TypeDescription
insert(rowData)TableA table keyvalue that represents a row data.
record_exists(column, value)String, StringChecks if a row with the matching column exists

Limitations

Currently our package doesn’t support nested objects. That means that a table with a JSON structure such as:

{
    "hub_user_id": "user-id-here",
    "category": "signup",
    "action": "click",
    "label": "github auth",
    "nested_data": {
      "location": "GB",
      "date": "2020-05-10"
    }
}

will not be successfully inserted. Therefore, flatten the structure in the function before inserting it.

Troubleshooting

Few things to note:

  • Ensure that project ID, dataset ID and table ID are there.
  • BigQuery table schema is defined by the user. You don’t have to write all the fields (most of the can be nullable) but if you try to write a field that doesn’t exist, BigQuery will refuse to write.
Did this page help you?