Firestore Python With Multiprocessing [Solution]

November 22, 2020

Previously I bump into issues with using Firestore with Multiprocessing due to some SSL error.

E1121 21:29:51.425705802   50473 ssl_transport_security.cc:507] Corruption detected.
E1121 21:29:51.425745264   50473 ssl_transport_security.cc:483] error:100003fc:SSL routines:OPENSSL_internal:SSLV3_ALERT_BAD_RECORD_MAC
E1121 21:29:51.425756828   50473 secure_endpoint.cc:208]     Decryption error: TSI_DATA_CORRUPTED

Finally found a solution after various attempt

  • You need to create an App instance for each process.
import time
import multiprocessing as mp
import queue

import firebase_admin
from firebase_admin import credentials, firestore, auth

def run(uid, cred):
    print(f"run.START={uid}")

    # create app for new process
    firebase_app = firebase_admin.initialize_app(cred, name=f"user-{uid}")
    db = firestore.client(app=firebase_app)
    
    user_ref = db.collection('user').document(uid)
    docs = user_ref.collection('bot').stream()
    for _doc in docs:
        print(f"bot: {_doc.id}, {_doc.get('name')}")

    while True:
        print('u', end='', flush=True)

        time.sleep(1)

    firebase_admin.delete_app(firebase_app)


user_changes = queue.Queue()

def on_user_snapshot(doc_snapshot, changes, read_time):
    for _doc in doc_snapshot:
        print(f"user: {_doc.id}, {_doc.get('name')}")

    user_changes.put(changes)


def main():
    cred = credentials.Certificate('secret/firebase-adminsdk.json')

    # create app for main thread
    firebase_app = firebase_admin.initialize_app(cred)
    db = firestore.client(app=firebase_app)
    
    user_ref = db.collection('user')

    doc_watch = user_ref.on_snapshot(on_user_snapshot)
    while True:
        print('.', end='', flush=True)

        try:
            changes = user_changes.get(timeout=1)
        except queue.Empty:
            changes = None
        if changes:
            for _change in changes:
                doc = _change.document
                if _change.type.name == 'ADDED':
                    p = mp.Process(target=run, args=(doc.id, cred))
                    p.start()

                    # run(doc.reference)
        
        # time.sleep(1)
    doc_watch.unsubscribe()



if __name__ == "__main__":
    main()

Output

.user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
.run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
bot: GQdpffr5qkjgFwwjj6B5, bot02
bot: QwhY8fQ2lsmBGdvcWw7E, bot01
u.u.u.u.u.u

References:

This work is licensed under a
Creative Commons Attribution-NonCommercial 4.0 International License.