Secure Cloud Functions With OIDC Account (from Cloud Scheduler) - Python

NOTE: Refer Secure Cloud Functions for Cloud Scheduler for latest update and implementation.

There are a few ways to secure cloud functions from unauthorize access:

I created a Cloud Functions crob job using Cloud Scheduler with OIDC account authentication, and the following code will verify the OIDC account provided by Cloud Scheduler.

Edit requirements.txt

# https://pypi.org/project/google-auth/
google-auth==1.6.3
# https://pypi.org/project/CacheControl/
CacheControl==0.12.5
# https://pypi.org/project/requests/
requests==2.22.0

Code

import osfrom google.oauth2 import id_tokenfrom flask import abortlog = logging.getLogger(__name__)def test_acccount(request):    authorization = request.headers.get('Authorization')    token = None    if authorization and authorization.startswith('Bearer '):        token = authorization.split('Bearer ')[1]    else:        log.error('Invalid Authorization header')        abort(401)    try:        # from google.auth.transport import requests        # transport_request = requests.Request()        import cachecontrol        import google.auth.transport.requests        import requests        session = requests.session()        cached_session = cachecontrol.CacheControl(session)        transport_request = google.auth.transport.requests.Request(session=cached_session)        HOST = request.headers.get('Host')        REGION = os.environ.get('FUNCTION_REGION')        PROJECT_ID = os.environ.get('GCP_PROJECT')        FUNCTION_NAME = os.environ.get('FUNCTION_NAME')        EMAIL = 'NAME@PROJECT_ID.iam.gserviceaccount.com'        # audience = f'https://{HOST}/{FUNCTION_NAME}'        audience = f'https://{REGION}-{PROJECT_ID}.cloudfunctions.net/{FUNCTION_NAME}'        decoded_token = id_token.verify_oauth2_token(token, transport_request, audience)        # log.info(f"decoded_token={decoded_token}")        if decoded_token['iss'] != 'https://accounts.google.com':            log.error(f"Wrong issuer: {decoded_token['iss']}")            abort(401)        # userid = decoded_token['sub']        if decoded_token['email'] != EMAIL:            log.error(f"Wrong email: {decoded_token['email']}")            abort(401)    except Exception as e: # ValueError or auth.AuthError        log.error(e)        abort(401)    # do something    return 'OK'

Python Decorators

import osimport cachecontrolimport google.auth.transport.requestsimport requestsfrom functools import wrapsfrom google.oauth2 import id_tokendef oidc_auth_required(email=None, audience=None):    def decorator(f):        @wraps(f)        def wrapper(request):            authorization = request.headers.get('Authorization')            token = None            if authorization and authorization.startswith('Bearer '):                token = authorization.split('Bearer ')[1]            else:                log.error('Invalid Authorization header')                abort(401)            try:                session = requests.session()                cached_session = cachecontrol.CacheControl(session)                transport_request = google.auth.transport.requests.Request(session=cached_session)                _audience = audience                if not _audience:                    # HOST = request.headers.get('Host')                    REGION = os.environ.get('FUNCTION_REGION')                    PROJECT_ID = os.environ.get('GCP_PROJECT')                    FUNCTION_NAME = os.environ.get('FUNCTION_NAME')                    # audience = f'https://{HOST}/{FUNCTION_NAME}'                    if FUNCTION_NAME:                        _audience = f'https://{REGION}-{PROJECT_ID}.cloudfunctions.net/{FUNCTION_NAME}'                decoded_token = id_token.verify_oauth2_token(token, transport_request, _audience)                if decoded_token['iss'] != 'https://accounts.google.com':                    log.error(f"Wrong issuer: {decoded_token['iss']}")                    abort(401)                # optional email verification                if email and decoded_token['email'] != email:                    log.error(f"Wrong email: {decoded_token['email']}")                    abort(401)                return f(request, decoded_token)            except Exception as e: # ValueError or auth.AuthError                log.error(e)                abort(401)        return wrapper    return decorator

Usage

@oidc_auth_required('NAME@PROJECT_ID.iam.gserviceaccount.com')def test_account(request, decoded_token):    return decoded_token['email']

References:

❤️ Is this article helpful?

Buy me a coffee ☕ or support my work via PayPal to keep this space 🖖 and ad-free.

Do send some 💖 to @d_luaz or share this article.

✨ By Desmond Lua

A dream boy who enjoys making apps, travelling and making youtube videos. Follow me on @d_luaz

👶 Apps I built

Travelopy - discover travel places in Malaysia, Singapore, Taiwan, Japan.