Prefect and SQLAlchemy are a data powerhouse duo. With Prefect, your workflows are orchestratable and observable, and with SQLAlchemy, your databases are a snap to handle! Get ready to experience the ultimate data "flow-chemistry"!
To set up a table, use the execute and execute_many methods. Then, use the fetch_many method to retrieve data in a stream until there's no more data.
By using the SqlAlchemyConnector as a context manager, you can make sure that the SQLAlchemy engine and any connected resources are closed properly after you're done with them.
fromprefectimportflow,taskfromprefect_sqlalchemyimportSqlAlchemyConnector@taskdefsetup_table(block_name:str)->None:withSqlAlchemyConnector.load(block_name)asconnector:connector.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")connector.execute("INSERT INTO customers (name, address) VALUES (:name, :address);",parameters={"name":"Marvin","address":"Highway 42"},)connector.execute_many("INSERT INTO customers (name, address) VALUES (:name, :address);",seq_of_parameters=[{"name":"Ford","address":"Highway 42"},{"name":"Unknown","address":"Highway 42"},],)@taskdeffetch_data(block_name:str)->list:all_rows=[]withSqlAlchemyConnector.load(block_name)asconnector:whileTrue:# Repeated fetch* calls using the same operation will# skip re-executing and instead return the next set of resultsnew_rows=connector.fetch_many("SELECT * FROM customers",size=2)iflen(new_rows)==0:breakall_rows.append(new_rows)returnall_rows@flowdefsqlalchemy_flow(block_name:str)->list:setup_table(block_name)all_rows=fetch_data(block_name)returnall_rowssqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")
fromprefectimportflow,taskfromprefect_sqlalchemyimportSqlAlchemyConnectorimportasyncio@taskasyncdefsetup_table(block_name:str)->None:asyncwithawaitSqlAlchemyConnector.load(block_name)asconnector:awaitconnector.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")awaitconnector.execute("INSERT INTO customers (name, address) VALUES (:name, :address);",parameters={"name":"Marvin","address":"Highway 42"},)awaitconnector.execute_many("INSERT INTO customers (name, address) VALUES (:name, :address);",seq_of_parameters=[{"name":"Ford","address":"Highway 42"},{"name":"Unknown","address":"Highway 42"},],)@taskasyncdeffetch_data(block_name:str)->list:all_rows=[]asyncwithawaitSqlAlchemyConnector.load(block_name)asconnector:whileTrue:# Repeated fetch* calls using the same operation will# skip re-executing and instead return the next set of resultsnew_rows=awaitconnector.fetch_many("SELECT * FROM customers",size=2)iflen(new_rows)==0:breakall_rows.append(new_rows)returnall_rows@flowasyncdefsqlalchemy_flow(block_name:str)->list:awaitsetup_table(block_name)all_rows=awaitfetch_data(block_name)returnall_rowsasyncio.run(sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER"))
We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.
The tasks in this library are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.