REAL-TIME BIKE AVAILABILITY
By Yassine – April 8, 2025

In this project, I will demonstrate how to store real-time data in BigQuery using Pub/Sub.
I will retrieve the status of bike stations in Toulouse from an external API, which will be triggered every minute using Cloud Scheduler.
Prerequisites
Before getting started, make sure you have the following requirements ready:
- An active Google Cloud Platform (GCP) account.
- A valid API key obtained from api.jcdecaux.com.
Generate an API Key
To access real-time data from bike stations, you’ll need to generate an API key.
Visit developer.jcdecaux.com to create your API key, which will be required throughout this project.
BigQuery Table Creation
Create DataSets
We will begin by creating a staging dataset where the raw data retrieved from the API will be stored.
bq mk --dataset --location=US staging
Next, we will create a second dataset intended to store the cleaned and transformed data, which will be used for analysis purposes.
bq mk --dataset --location=US toulouse_bikes
Create Tables
Here, we will create two tables within the staging dataset: live_data
and raw_data
. These two tables serve distinct purposes.
The raw_data
table will store the historical records of Toulouse bike stations, which will later be used for behavioral analysis.
On the other hand, the live_data
table will be dedicated to real-time tracking of station availability. For this table, we only need to keep the most recent record, as this approach is more cost-effective for querying.
CREATE TABLE `staging.raw_data` (
data,
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
CREATE TABLE `staging.live_data` (
data,
date TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
The data
field will store the raw JSON payload received from Pub/Sub,
allowing us to preserve the original structure of the data for future processing or analysis.
The date
field is of type TIMESTAMP
and uses DEFAULT CURRENT_TIMESTAMP()
to automatically record the exact time at which each message is inserted into the table.
Create PubSub Topic & Subscribers
Topic Creation
In this step, we create a single Pub/Sub topic named bike-project
. This topic will act as a central communication channel to receive real-time data pushed by a Python script that fetches the bike availability information from the API.
gcloud pubsub topics create bike-project
Subscribers Creation
Next, we create two Pub/Sub subscribers linked to the bike-project
topic. Each subscriber will automatically write the incoming messages into a specific BigQuery table within the staging
dataset. One table stores the raw data, while the other holds the latest live view used for analytics and monitoring.
gcloud pubsub subscriptions create bq-view1 \
--topic=bike-project \
--bigquery-table=[PROJECT-ID]:staging.raw_data
gcloud pubsub subscriptions create bq-live-view1 \
--topic=bike-project \
--bigquery-table=[PROJECT-ID]:staging.live_data
Create a Cloud Run Function
The objective of the Cloud Run function you are about to create is to execute a Python script that retrieves data using the API key and publishes it to a Pub/Sub topic.
This setup allows the API to be called in a serverless and scalable way, ensuring the data is efficiently injected into the streaming pipeline.
Import the repository into your GCP environment using the Bash command below.
git clone https://github.com/yassinelaghrabli/GCP_Projects/ToulouseBikes.git
Navigate to the folder that contains the Cloud Run scripts using the command below.
cd GCP_Projects/ToulouseBikes/CloudRunFunctions
The purpose of this Google Cloud Run function is to execute a Python script that retrieves data from the JCDecaux API.
This command will deploy the script along with the required environment variables.
In Cloud Run, there is no need to deploy a .env
file. You can define environment variables using the --set-env-vars
option, as shown below.
gcloud functions deploy publish_bike_data \
--gen2 \
--runtime=python310 \
--region=europe-west1 \
--entry-point=publish_bike_data \
--source=./ApiCall \
--trigger-http \
--allow-unauthenticated \
--memory=256Mi \
--cpu=0.25 \
--set-env-vars \
PROJECT_ID=[PROJECT-ID],TOPIC=[TOPIC-NAME],API_KEY=[API-KEY]
You can now deploy the second function, which is designed to update the live_data
table.
This table contains only the latest API call, ensuring that when Looker Studio queries the table every minute,
the query cost remains minimal. Otherwise, continuously querying a table with all historical data would result in astronomical billing.
gcloud functions deploy live_data_clean \
--gen2 \
--runtime=python310 \
--region=europe-west1 \
--entry-point=live_data_clean \
--source=./RefreshData \
--trigger-http \
--allow-unauthenticated \
--memory=256Mi \
--cpu=0.25
I decided to use a Cloud Run function to execute the two SQL queries that refresh the data, instead of using the BigQuery scheduler. The main reason is that the BigQuery scheduler has a minimum interval of 5 minutes, whereas I needed the data to be refreshed every minute. Otherwise, using the BigQuery scheduler would have been the preferred option.
SQL Queries Explanation
The first query below deletes all rows except the one from the latest API call.
DELETE FROM `staging.live_data` AS t
WHERE date < (
SELECT MAX(sub.date)
FROM `staging.live_data` AS sub
WHERE JSON_VALUE(sub.data, '$.number') = JSON_VALUE(t.data, '$.number')
);
I chose this approach because Pub/Sub does not provide an option to update existing rows in a table — it can only append data. Since the goal is to keep only a single row, this ensures that each time we send data to the final table, we only have to handle one row.
The second script bellow send the data to the final table live_data
that we will query from locker studio.
DECLARE table_row_count INT64;
DECLARE table_exists BOOL;
SET table_exists = EXISTS (
SELECT 1
FROM `toulouse_bikes.INFORMATION_SCHEMA.TABLES`
WHERE table_name = 'live_data'
AND table_schema = 'toulouse_bikes'
);
IF table_exists THEN
SET table_row_count = (
SELECT COUNT(*)
FROM `toulouse_bikes.live_data`
);
ELSE
SET table_row_count = 0;
END IF;
IF NOT table_exists OR table_row_count = 0 THEN
CREATE OR REPLACE TABLE `toulouse_bikes.live_data` AS
SELECT
JSON_VALUE(data, '$.number') AS number,
JSON_VALUE(data, '$.contract_name') AS contract_name,
JSON_VALUE(data, '$.name') AS name,
JSON_VALUE(data, '$.address') AS address,
CAST(JSON_VALUE(data, '$.position.lat') AS FLOAT64) AS latitude,
CAST(JSON_VALUE(data, '$.position.lng') AS FLOAT64) AS longitude,
CAST(JSON_VALUE(data, '$.banking') AS BOOL) AS banking,
CAST(JSON_VALUE(data, '$.bonus') AS BOOL) AS bonus,
CAST(JSON_VALUE(data, '$.bike_stands') AS INT64) AS bike_stands,
CAST(JSON_VALUE(data, '$.available_bike_stands') AS INT64) AS available_bike_stands,
CAST(JSON_VALUE(data, '$.available_bikes') AS INT64) AS available_bikes,
JSON_VALUE(data, '$.status') AS status,
CAST(JSON_VALUE(data, '$.last_update') AS INT64) AS last_update,
date
FROM `staging.live_data`
QUALIFY ROW_NUMBER() OVER (
PARTITION BY JSON_VALUE(data, '$.number')
ORDER BY date DESC
) = 1;
ELSE
MERGE INTO `toulouse_bikes.live_data` AS target
USING (
SELECT
JSON_VALUE(data, '$.number') AS number,
JSON_VALUE(data, '$.contract_name') AS contract_name,
JSON_VALUE(data, '$.name') AS name,
JSON_VALUE(data, '$.address') AS address,
CAST(JSON_VALUE(data, '$.position.lat') AS FLOAT64) AS latitude,
CAST(JSON_VALUE(data, '$.position.lng') AS FLOAT64) AS longitude,
CAST(JSON_VALUE(data, '$.banking') AS BOOL) AS banking,
CAST(JSON_VALUE(data, '$.bonus') AS BOOL) AS bonus,
CAST(JSON_VALUE(data, '$.bike_stands') AS INT64) AS bike_stands,
CAST(JSON_VALUE(data, '$.available_bike_stands') AS INT64) AS available_bike_stands,
CAST(JSON_VALUE(data, '$.available_bikes') AS INT64) AS available_bikes,
JSON_VALUE(data, '$.status') AS status,
CAST(JSON_VALUE(data, '$.last_update') AS INT64) AS last_update,
date
FROM `staging.live_data`
QUALIFY ROW_NUMBER() OVER (
PARTITION BY JSON_VALUE(data, '$.number')
ORDER BY date DESC
) = 1
) AS source
ON target.number = source.number
WHEN MATCHED THEN
UPDATE SET
target.contract_name = source.contract_name,
target.name = source.name,
target.address = source.address,
target.latitude = source.latitude,
target.longitude = source.longitude,
target.banking = source.banking,
target.bonus = source.bonus,
target.bike_stands = source.bike_stands,
target.available_bike_stands = source.available_bike_stands,
target.available_bikes = source.available_bikes,
target.status = source.status,
target.last_update = source.last_update,
target.date = source.date;
END IF;
This script first checks whether the final table already exists or if it is empty. If either of these conditions is true, the table will be created. Otherwise, if the table exists and is not empty, it will be updated using a merge operation.
Cloud Scheduler Job Creation
Once the scripts are deployed, we need to schedule their execution every minute using Cloud Scheduler.
gcloud scheduler jobs create pubsub GetApiData
--schedule="* * * * *"
--http-method=GET \
--uri=[Cloud_Run_Function_URL] \
--location="europe-west1" \
--time-zone="Europe/Paris" \
--oidc-service-account-email=[your-service-account]
This command lets you schedule the execution of your script every minute using Cloud Scheduler. Simply replace the function URL with the one associated with the Cloud Function you want to schedule, and provide the service account email that has permission to invoke the function.
To schedule the other Cloud Function, run the same command again, changing the job name and the function URL accordingly.
You can find the url by cliking on the Cloud Run Function Name.

Now that the data is available in BigQuery, you can choose how you want to visualize it. You can build a custom application or use a dashboarding tool like Power BI or Looker Studio to display real-time bike availability and usage trends.