Firestore Python Multiprocessing Get Realtime Update (on_snapshot) [Problem]

November 21, 2020
ssl_transport_security.cc:507] Corruption detected.

Note

Does Python Client for Google Cloud Firestore works with multiprocessing?

Based on the documentation

Because this client uses grpcio library, it is safe to share instances across threads. In multiprocessing scenarios, the best practice is to create client instances after the invocation of os.fork() by multiprocessing.Pool or multiprocessing.Process.

Environment

Ubuntu 20.04.1 LTS
Python 3.8.6 
firebase-admin==4.4.0
google-cloud-core==1.4.3
grpcio==1.33.2

stream

  • Use main thread to stream list of user
  • Launch a process for each user, where it streams the list of bot belong to the user
import time
import multiprocessing as mp
import queue

import firebase_admin
from firebase_admin import credentials, firestore, auth

# user fork as per firestore doc
mp.set_start_method('fork')

def run(user_ref):
    print(f"run.START={user_ref.id}")
    db = firestore.client()
    user_ref = db.collection('user').document(user_ref.id)
    docs = user_ref.collection('bot').stream()
    for _doc in docs:
        print(f"bot: {_doc.id}, {_doc.get('name')}")

    while True:
        print('u', end='', flush=True)

        time.sleep(1)

def main():
    cred = credentials.Certificate('secret/firebase-adminsdk.json')
    firebase_admin.initialize_app(cred)

    db = firestore.client()
    user_ref = db.collection('user')

    docs = user_ref.stream()
    processes = []
    for doc in docs:
        print(f"user: {_doc.id}, {_doc.get('name')}")
        p = mp.Process(target=run, args=(doc.reference,))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

It works fine most of the time.

user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
bot: GQdpffr5qkjgFwwjj6B5, bot02
bot: QwhY8fQ2lsmBGdvcWw7E, bot01
uuuuuuuu

Sometimes it bump into the following error

user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
E1121 21:19:27.795954422   49296 ssl_transport_security.cc:507] Corruption detected.
E1121 21:19:27.796283971   49296 ssl_transport_security.cc:483] error:1e000065:Cipher functions:OPENSSL_internal:BAD_DECRYPT
E1121 21:19:27.796319418   49296 ssl_transport_security.cc:483] error:1000008b:SSL routines:OPENSSL_internal:DECRYPTION_FAILED_OR_BAD_RECORD_MAC
E1121 21:19:27.796339497   49296 secure_endpoint.cc:208]     Decryption error: TSI_DATA_CORRUPTED
E1121 21:19:27.796561933   49296 ssl_transport_security.cc:534] SSL_write failed with error SSL_ERROR_SSL.

on_snapshot

The problem is more consistent and obvious using on_snapshot, as the connection is kept open.

import time
import multiprocessing as mp
import queue

import firebase_admin
from firebase_admin import credentials, firestore, auth

mp.set_start_method('fork')


def run(user_ref):
    print(f"run.START={user_ref.id}")
    db = firestore.client()
    user_ref = db.collection('user').document(user_ref.id)
    docs = user_ref.collection('bot').stream()
    for _doc in docs:
        print(f"bot: {_doc.id}, {_doc.get('name')}")

    while True:
        print('u', end='', flush=True)

        time.sleep(1)


# user_changes = mp.Queue()
user_changes = queue.Queue()

def on_user_snapshot(doc_snapshot, changes, read_time):
    for _doc in doc_snapshot:
        print(f"user: {_doc.id}, {_doc.get('name')}")

    user_changes.put(changes)


def main():
    # global db
    cred = credentials.Certificate('secret/firebase-adminsdk.json')
    firebase_admin.initialize_app(cred)

    db = firestore.client()
    user_ref = db.collection('user')

    doc_watch = user_ref.on_snapshot(on_user_snapshot)
    while True:
        print('.', end='', flush=True)

        try:
            changes = user_changes.get(timeout=1)
        except queue.Empty:
            changes = None
        if changes:
            for _change in changes:
                doc = _change.document
                if _change.type.name == 'ADDED':
                    p = mp.Process(target=run, args=(doc.reference,))
                    p.start()

                    # run(doc.reference)
        
        # time.sleep(1)
    doc_watch.unsubscribe()


if __name__ == "__main__":
    main()

Error

.user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
.run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
E1121 21:29:51.425705802   50473 ssl_transport_security.cc:507] Corruption detected.
E1121 21:29:51.425745264   50473 ssl_transport_security.cc:483] error:100003fc:SSL routines:OPENSSL_internal:SSLV3_ALERT_BAD_RECORD_MAC
E1121 21:29:51.425756828   50473 secure_endpoint.cc:208]     Decryption error: TSI_DATA_CORRUPTED
........

It works fine if doc_watch.unsubscribe() is called before launching new process.

Troubleshoot

  • Using mp.set_start_method('fork') or not doesn’t matter
  • OpenSSL version is OpenSSL 1.1.1f 31 Mar 2020, upgrading to OpenSSL 1.1.1h 22 Sep 2020 doesn’t help
  • I try running on another PC with Ubuntu 18.04, the same problem persist
  • I suspect the problem is probably due to grpc: I try revert to grpcio to 1.32.0 and 1.31.0 (current version is 1.33.2), but the problem still persist.
  • I try build grpc from source using OpenSSL, yet the problem still persist.
  • I try using google.cloud import firestore directly without firebase-admin, cause more problem with segmentation fault.

Solution

2020-11-22: Finally found a solution.

This work is licensed under a
Creative Commons Attribution-NonCommercial 4.0 International License.