.
Note
Does Python Client for Google Cloud Firestore works with multiprocessing?
Based on the documentation
Because this client uses grpcio library, it is safe to share instances across threads. In multiprocessing scenarios, the best practice is to create client instances after the invocation of os.fork() by multiprocessing.Pool or multiprocessing.Process.
Environment
Ubuntu 20.04.1 LTS
Python 3.8.6
firebase-admin==4.4.0
google-cloud-core==1.4.3
google-cloud-firestore==2.0.1
grpcio==1.33.2
stream
- Use main thread to stream list of
user
- Launch a process for each user, where it streams the list of
bot
belong to the user
import timeimport multiprocessing as mpimport queueimport firebase_adminfrom firebase_admin import credentials, firestore, auth# user fork as per firestore docmp.set_start_method('fork')def run(user_ref): print(f"run.START={user_ref.id}") db = firestore.client() user_ref = db.collection('user').document(user_ref.id) docs = user_ref.collection('bot').stream() for _doc in docs: print(f"bot: {_doc.id}, {_doc.get('name')}") while True: print('u', end='', flush=True) time.sleep(1)def main(): cred = credentials.Certificate('secret/firebase-adminsdk.json') firebase_admin.initialize_app(cred) db = firestore.client() user_ref = db.collection('user') docs = user_ref.stream() processes = [] for doc in docs: print(f"user: {_doc.id}, {_doc.get('name')}") p = mp.Process(target=run, args=(doc.reference,)) p.start() processes.append(p) for p in processes: p.join()if __name__ == "__main__": main()
It works fine most of the time.
user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
bot: GQdpffr5qkjgFwwjj6B5, bot02
bot: QwhY8fQ2lsmBGdvcWw7E, bot01
uuuuuuuu
Sometimes it bump into the following error
user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
E1121 21:19:27.795954422 49296 ssl_transport_security.cc:507] Corruption detected.
E1121 21:19:27.796283971 49296 ssl_transport_security.cc:483] error:1e000065:Cipher functions:OPENSSL_internal:BAD_DECRYPT
E1121 21:19:27.796319418 49296 ssl_transport_security.cc:483] error:1000008b:SSL routines:OPENSSL_internal:DECRYPTION_FAILED_OR_BAD_RECORD_MAC
E1121 21:19:27.796339497 49296 secure_endpoint.cc:208] Decryption error: TSI_DATA_CORRUPTED
E1121 21:19:27.796561933 49296 ssl_transport_security.cc:534] SSL_write failed with error SSL_ERROR_SSL.
on_snapshot
The problem is more consistent and obvious using on_snapshot
, as the connection is kept open.
import timeimport multiprocessing as mpimport queueimport firebase_adminfrom firebase_admin import credentials, firestore, authmp.set_start_method('fork')def run(user_ref): print(f"run.START={user_ref.id}") db = firestore.client() user_ref = db.collection('user').document(user_ref.id) docs = user_ref.collection('bot').stream() for _doc in docs: print(f"bot: {_doc.id}, {_doc.get('name')}") while True: print('u', end='', flush=True) time.sleep(1)# user_changes = mp.Queue()user_changes = queue.Queue()def on_user_snapshot(doc_snapshot, changes, read_time): for _doc in doc_snapshot: print(f"user: {_doc.id}, {_doc.get('name')}") user_changes.put(changes)def main(): # global db cred = credentials.Certificate('secret/firebase-adminsdk.json') firebase_admin.initialize_app(cred) db = firestore.client() user_ref = db.collection('user') doc_watch = user_ref.on_snapshot(on_user_snapshot) while True: print('.', end='', flush=True) try: changes = user_changes.get(timeout=1) except queue.Empty: changes = None if changes: for _change in changes: doc = _change.document if _change.type.name == 'ADDED': p = mp.Process(target=run, args=(doc.reference,)) p.start() # run(doc.reference) # time.sleep(1) doc_watch.unsubscribe()if __name__ == "__main__": main()
Error
.user: wNjJH04noUb7MehTWiyWRRVTY6n1, user-01
.run.START=wNjJH04noUb7MehTWiyWRRVTY6n1
E1121 21:29:51.425705802 50473 ssl_transport_security.cc:507] Corruption detected.
E1121 21:29:51.425745264 50473 ssl_transport_security.cc:483] error:100003fc:SSL routines:OPENSSL_internal:SSLV3_ALERT_BAD_RECORD_MAC
E1121 21:29:51.425756828 50473 secure_endpoint.cc:208] Decryption error: TSI_DATA_CORRUPTED
........
It works fine if doc_watch.unsubscribe()
is called before launching new process.
Troubleshoot
- Using
mp.set_start_method('fork')
or not doesn't matter - OpenSSL version is
OpenSSL 1.1.1f 31 Mar 2020
, upgrading toOpenSSL 1.1.1h 22 Sep 2020
doesn't help - I try running on another PC with Ubuntu 18.04, the same problem persist
- I suspect the problem is probably due to grpc: I try revert to
grpcio
to1.32.0
and1.31.0
(current version is1.33.2
), but the problem still persist. - I try build grpc from source using OpenSSL, yet the problem still persist.
- I try using
google.cloud import firestore
directly withoutfirebase-admin
, cause more problem with segmentation fault.
Solution
2020-11-22: Finally found a solution.