Google Analytics ETL

2019-07-04
Python Pandas ETL API Google Analytics



google_analytics_etl

Google Analytics is really useful but sometimes you want retrive all the data and perform your own analysis and create your reports. In this post you will see how to do that with python.

1. Google permisions and keys

1.1. Get the Google key

The first thing to do is to download the Google key. To do so you only need to follow Google Analytics API steps.

To sum them up you will need to:

  1. Go to google service accounts page.
  2. Make sure to select the desired project from the navigation bar (at the top).
  3. Start to create a new service account
  4. Give it some roles (optional)
  5. Create a json private key
  6. Download that key. It will allow you to access google data

1.2. Enable Google Analytics API

You will need to enable the usage of the Google Analytics API with:

  1. Go to API in Google Developers console.
  2. Make sure to select the desired project from the navigation bar (at the top).
  3. Click enable

1.3. Allow the user to see Google Analytics data

From the previous step you created and account with an email similar to your_project@your_web.iam.gserviceaccount.com (you obviously need to change your_web and your_project). Then give it read permisions with:

  1. Go to Google Analytics web.
  2. Select the Admin section from the left bar
  3. Click Manage Users at account level. It is important that you give it at the account level in order to work.
  4. Add the user with the email from the previous step
  5. Give it read access.

It can take some time until the new user has reading permisions.

1.4. Profile ID

Get the ID that you will need to use with the API. There are three levels and the ID that you need to use is in the last one, the one with the red mark in the image.

google_analytics_profile

2. Test the conection

You can run the test_conexion.py code to check that everything is working.

test_conexion.py
from datetime import date, timedelta

from apiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials


# Those are the two constants you need to change
FILE_GA_KEY = "" path tho the key file
PROFILE = "" profile ID


def get_ga_service():
    """ Connect to GA API service"""

    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        FILE_GA_KEY, scopes=["https://www.googleapis.com/auth/analytics.readonly"]
    )

    # Build the service object.
    return build("analytics", "v3", credentials=credentials)


def test():
    """ Tests the conexion """

    service = get_ga_service()

    # Feel free to change those values
    end = date.today()
    dimensions = ["ga:date", "ga:deviceCategory"]
    metrics = ["ga:users", "ga:sessions"]
    start = end - timedelta(7)

    # Query the API
    kwa = {
        "ids": "ga:{}".format(PROFILE),
        "start_date": "{:%Y-%m-%d}".format(start),
        "end_date": "{:%Y-%m-%d}".format(end),
        "metrics": ",".join(metrics),
        "dimensions": ",".join(dimensions),
        "max_results": 20,
    }

    return service.data().ga().get(**kwa).execute()

Remeber to edit both FILE_GA_KEY and PROFILE constants using the values from the previous steps.

3. Building the ETL

3.1. Decide metrics and dimensions

Now is the time to decide what you want to download. There are two types of parameters metrics and dimensions.

You can check with query explorer the possible values for both of them and you can also try your queries.

3.2. Extract and transform the data

In this step I will descrive functions in the etl.py file.

The first thing to do is to create the Google Analytics service with:

etl.py
def get_ga_service(key_file_location=None):
    """ Connect to GA API service"""

    if key_file_location is None:
        key_file_location = c.FILE_GA_KEY

    scope = "https://www.googleapis.com/auth/analytics.readonly"

    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        key_file_location, scopes=[scope]
    )

    # Build the service object.
    return build("analytics", "v3", credentials=credentials)

This function takes the default path of the Google Analytics key from the constants while allowing to pass a custom one.

Then to retrive the actual data you should use get_da_df function.

etl.py
def get_ga_df(query_data, end=date.today(), service=None):
    """ Retrive GA data as dataframe """

    if service is None:
        service = get_ga_service()

    dimensions = query_data["dimensions"].keys()
    metrics = query_data["metrics"].keys()

    start = end - timedelta(c.TIMEWINDOW)

    # Query the API
    kwa = {
        "ids": "ga:{}".format(c.PROFILE),
        "start_date": "{:%Y-%m-%d}".format(start),
        "end_date": "{:%Y-%m-%d}".format(end),
        "metrics": ",".join(metrics),
        "dimensions": ",".join(dimensions),
        "max_results": c.MAX_RESULTS,
    }
    data = service.data().ga().get(**kwa).execute()

    # Create df from data obtained through the API
    columns = [x["name"] for x in data["columnHeaders"]]
    df = pd.DataFrame(data["rows"], columns=columns)

    # Handle missing values
    for x in dimensions:
        df[x] = df[x].replace("(not set)", float("nan"))

    # handle missing campaigns
    if ("ga:adwordsCampaignID" in df.columns) and ("ga:campaign" in df.columns):
        df.dropna(subset=["ga:adwordsCampaignID", "ga:campaign"], inplace=True)

    # Rename columns
    rename_dict = {i: x[0] for i, x in query_data["dimensions"].items()}
    rename_dict.update({i: x[0] for i, x in query_data["metrics"].items()})
    df = df.rename(index=str, columns=rename_dict)

    # Transform types
    for x in ["dimensions", "metrics"]:
        df = u.fix_types(df, query_data[x].values())

    log.info("Data read")

    return df

This functions retrive a pandas dataframe using the requested parameters. After creating the raw dataframe it renames the columns using the dictionary QUERY_DATA from the constants.py file. It will also change the columns types using the same dictionary.

So for exemple having:

constants.py
QUERY_DATA = {
  "traffic_google": {
      "dimensions": {"ga:date": ("date", "date"), "ga:deviceCategory": ("device", str)},
      "metrics": {
          "ga:users": ("users", int),
          "ga:sessions": ("sessions", int),
          "ga:bounceRate": ("bounce_rate", float),
      },
  },
}

Will retrive the ga:deviceCategory dimension rename the column to device and store it as a string.

Finally to run the ETL for all the tables in `QUERY_DATA there is the function do_etl:

etl.py
def do_etl():
    """ Reads from GA, transforms the data and loads into mysql """

    time0 = time()

    for tablename, data in c.QUERY_DATA.items():

        # Retrive data from GA API
        df = get_ga_df(data)

        # Insert data into mysql
        insert_into_mysql(df, tablename)

    log.info("Data imported", time=time() - time0)

This is what you should run every day. By default it will take the data from the last 7 days. This redundancy will mean that even if the ETL fail some day it can automatically retrive the missing data the next day. In order to lose that it must fail 7 consecutive days.

And the last function is for retriving all old data. This is meant to only be used the first time.

etl.py
def import_old_stats(end=date.today()):
  """ Imports old data up until 2017-01-01 """

  # Load from 2017-01-01
  while end > date(2017, 1, 1):

      time0 = time()
      start = end - timedelta(c.TIMEWINDOW)

      log.info(f"Importing data from {start:%Y-%m-%d} to {end:%Y-%m-%d}")

      for tablename, data in c.QUERY_DATA.items():

          # Retrive data from GA API
          df = get_ga_df(data, end=end)

          # Insert data into mysql
          insert_into_mysql(df, tablename)

      end -= timedelta(int(c.TIMEWINDOW) + 1)
      log.info("Data imported", time=time() - time0)

Right now it will download all data from 2017 until today.

3.3. Load the data

In this example amb going to show how to insert the data into MySQL. All this code is from sql.py file.

3.3.1. Handling duplicated data

The idea is that the ETL can be run more than once without creating duplicated data. This is also important since the ETL is designed with redundancy to allow some failures so it is needed to handle duplicates.

There are two aproaches to this problem:

3.3.1.1. Using date as identifier

Every time you download data you will have a dataframe with info of the last 7 days. In order to avoid duplicates you can simply delete all rows that have data within the same days.

3.3.1.2. Use all 'index' columns

The other aproach is to load all existing data and concatenate with the new one. This dataframe will have duplicates but they can be deleted with the drop_duplicates pandas function. To do so you need to specify which are the columns that define a unique entry.

This solution is less efficient since it will need to load all existing data and this can be time consuming.

3.2. Load code

The first thing to do is to create the SQL Alchemy engine for MySQL. For other SQL engines check the SQL Alchemy post.

sql.py
def _get_mysql_engine():
    """ Get MySQL SQLAlchemy engine """

    return sa.create_engine(
        sa.engine.url.URL(
            drivername="mysql+pymysql",
            username="username",  # Change that!!
            password="password",  # Change that!!
            host="host",  # Change that!!
            port=c.PORT,
            database=c.DATABASE,
        ),
        encoding="utf-8",  # Since there will be some japanse chars
    )

Using the create_engine and engine.url.URL functions from SQLAlchemy is the most elegant way to create the engine.

To retrive a table you can use:

sql.py
def get_df_mysql(tablename, columns="*", engine=None):
  """ Retrives one table from MySQL """

  if engine is None:
      engine = _get_mysql_engine()

  query = "SELECT {} FROM {}".format(columns, tablename)

  with engine.connect() as connection:
      return pd.read_sql_query(query, connection)

And finally the code to insert the data handling the duplicates:

sql.py
def insert_into_mysql(df, tablename, cols_id=None, engine=None):
    """
        Insert dataframe into mysql.
        If date column is present it will delete all rows that are in the same
        date range as df to be inserted. Else it will truncate the sql table.
        Args:
            df:         dataframe to insert
            tablename:  name of the table
            cols_id:    allow dropping duplicates using those columns insted of deleting all
            engine:     sql_alchemy engine
    """

    if engine is None:
        engine = _get_mysql_engine()

    # If date is present, delete rows of same date
    if "date" in df.columns:
        # Transform date to string to avoid SQL problems
        df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d")

        # Get all dates present in df to insert
        dates = df["date"].unique()

        # Delete data of same dates as the data that will be inserted
        sentence = c.DELETE.format(table=tablename, dates="','".join(dates))
        with engine.connect() as connection:
            connection.execute(sentence)

    # Truncate all data or keep non duplicated
    else:

        # Try to merge with existing data
        if cols_id is not None:
            # retrive existing data
            df_prev = get_df_mysql(tablename, engine=engine)

            # Drop duplicated data
            df = pd.concat([df, df_prev], sort=False)
            df.drop_duplicates(subset=cols_id, inplace=True)

        with engine.connect() as connection:
            connection.execute(c.TRUNCATE.format(tablename))

    # Insert into SQL
    df.to_sql(name=tablename, con=engine, if_exists="append", index=False)

This code will check if the date column is present. If it is it will handle duplicates using it, if not it will use the other aproach.

And with all that you can create an ETL to load data from Google Analytics.