Secure Cloud Functions With OIDC Account (from Cloud Scheduler) - Python

June 10, 2019

NOTE: Refer Secure Cloud Functions for Cloud Scheduler for latest update and implementation.

There are a few ways to secure cloud functions from unauthorize access:

I created a Cloud Functions crob job using Cloud Scheduler with OIDC account authentication, and the following code will verify the OIDC account provided by Cloud Scheduler.

Edit requirements.txt

# https://pypi.org/project/google-auth/
google-auth==1.6.3
# https://pypi.org/project/CacheControl/
CacheControl==0.12.5
# https://pypi.org/project/requests/
requests==2.22.0

Code

import os
from google.oauth2 import id_token
from flask import abort

log = logging.getLogger(__name__)

def test_acccount(request):
    authorization = request.headers.get('Authorization')
    token = None
    if authorization and authorization.startswith('Bearer '):
        token = authorization.split('Bearer ')[1]
    else:
        log.error('Invalid Authorization header')
        abort(401)

    try:
        # from google.auth.transport import requests
        # transport_request = requests.Request()
        import cachecontrol
        import google.auth.transport.requests
        import requests

        session = requests.session()
        cached_session = cachecontrol.CacheControl(session)
        transport_request = google.auth.transport.requests.Request(session=cached_session)

        HOST = request.headers.get('Host')
        REGION = os.environ.get('FUNCTION_REGION')
        PROJECT_ID = os.environ.get('GCP_PROJECT')
        FUNCTION_NAME = os.environ.get('FUNCTION_NAME')
        EMAIL = 'NAME@PROJECT_ID.iam.gserviceaccount.com'

        # audience = f'https://{HOST}/{FUNCTION_NAME}'
        audience = f'https://{REGION}-{PROJECT_ID}.cloudfunctions.net/{FUNCTION_NAME}'
        decoded_token = id_token.verify_oauth2_token(token, transport_request, audience)
        # log.info(f"decoded_token={decoded_token}")

        if decoded_token['iss'] != 'https://accounts.google.com':
            log.error(f"Wrong issuer: {decoded_token['iss']}")
            abort(401)

        # userid = decoded_token['sub']
        if decoded_token['email'] != EMAIL:
            log.error(f"Wrong email: {decoded_token['email']}")
            abort(401)

    except Exception as e: # ValueError or auth.AuthError
        log.error(e)
        abort(401)

    # do something
    return 'OK'

Python Decorators

import os
import cachecontrol
import google.auth.transport.requests
import requests
from functools import wraps
from google.oauth2 import id_token

def oidc_auth_required(email=None, audience=None):
    def decorator(f):
        @wraps(f)
        def wrapper(request):
            authorization = request.headers.get('Authorization')
            token = None
            if authorization and authorization.startswith('Bearer '):
                token = authorization.split('Bearer ')[1]
            else:
                log.error('Invalid Authorization header')
                abort(401)

            try:
                session = requests.session()
                cached_session = cachecontrol.CacheControl(session)
                transport_request = google.auth.transport.requests.Request(session=cached_session)

                _audience = audience
                if not _audience:
                    # HOST = request.headers.get('Host')
                    REGION = os.environ.get('FUNCTION_REGION')
                    PROJECT_ID = os.environ.get('GCP_PROJECT')
                    FUNCTION_NAME = os.environ.get('FUNCTION_NAME')

                    # audience = f'https://{HOST}/{FUNCTION_NAME}'
                    if FUNCTION_NAME:
                        _audience = f'https://{REGION}-{PROJECT_ID}.cloudfunctions.net/{FUNCTION_NAME}'

                decoded_token = id_token.verify_oauth2_token(token, transport_request, _audience)
                if decoded_token['iss'] != 'https://accounts.google.com':
                    log.error(f"Wrong issuer: {decoded_token['iss']}")
                    abort(401)

                # optional email verification
                if email and decoded_token['email'] != email:
                    log.error(f"Wrong email: {decoded_token['email']}")
                    abort(401)

                return f(request, decoded_token)
            except Exception as e: # ValueError or auth.AuthError
                log.error(e)
                abort(401)
        return wrapper
    return decorator

Usage

@oidc_auth_required('NAME@PROJECT_ID.iam.gserviceaccount.com')
def test_account(request, decoded_token):
    return decoded_token['email']

References:

This work is licensed under a
Creative Commons Attribution-NonCommercial 4.0 International License.