Firestore Python With Multiprocessing [Solution]

NOTE: This solution is incomplete as there is a core dump issue

Previously I bump into issues with using Firestore with Multiprocessing due to some SSL error.

E1121 21:29:51.425705802   50473 ssl_transport_security.cc:507] Corruption detected.
E1121 21:29:51.425745264   50473 ssl_transport_security.cc:483] error:100003fc:SSL routines:OPENSSL_internal:SSLV3_ALERT_BAD_RECORD_MAC
E1121 21:29:51.425756828   50473 secure_endpoint.cc:208]     Decryption error: TSI_DATA_CORRUPTED

Finally found a solution after various attempt

  • You need to create an App instance for each process.
import timeimport multiprocessing as mpimport queueimport firebase_adminfrom firebase_admin import credentials, firestore, authdef run(uid, cred):    print(f"run.START={uid}")    # create app for new process    firebase_app = firebase_admin.initialize_app(cred, name=f"user-{uid}")    db = firestore.client(app=firebase_app)        user_ref = db.collection('user').document(uid)    docs = user_ref.collection('bot').stream()    for _doc in docs:        print(f"bot: {_doc.id}, {_doc.get('name')}")    while True:        print('u', end='', flush=True)        time.sleep(1)    firebase_admin.delete_app(firebase_app)user_changes = queue.Queue()def on_user_snapshot(doc_snapshot, changes, read_time):    for _doc in doc_snapshot:        print(f"user: {_doc.id}, {_doc.get('name')}")    user_changes.put(changes)def main():    cred = credentials.Certificate('secret/firebase-adminsdk.json')    # create app for main thread    firebase_app = firebase_admin.initialize_app(cred)    db = firestore.client(app=firebase_app)        user_ref = db.collection('user')    doc_watch = user_ref.on_snapshot(on_user_snapshot)    while True:        print('.', end='', flush=True)        try:            changes = user_changes.get(timeout=1)        except queue.Empty:            changes = None        if changes:            for _change in changes:                doc = _change.document                if _change.type.name == 'ADDED':                    p = mp.Process(target=run, args=(doc.id, cred))                    p.start()                    # run(doc.reference)                # time.sleep(1)    doc_watch.unsubscribe()if __name__ == "__main__":    main()

Output

.user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
.run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
bot: GQdpffr5qkjgFwwjj6B5, bot02
bot: QwhY8fQ2lsmBGdvcWw7E, bot01
u.u.u.u.u.u

References:

❤️ Is this article helpful?

Buy me a coffee ☕ or support my work via PayPal to keep this space 🖖 and ad-free.

Do send some 💖 to @d_luaz or share this article.

✨ By Desmond Lua

A dream boy who enjoys making apps, travelling and making youtube videos. Follow me on @d_luaz

👶 Apps I built

Travelopy - discover travel places in Malaysia, Singapore, Taiwan, Japan.