@@ -898,21 +898,30 @@ def sqlite_buildin_types(sqlite_buildin, types_data):
898
898
@pytest .fixture (scope = "session" )
899
899
def worker_name (request ):
900
900
"""
901
- Creates a unique schema name for Postgres to use , in order to
901
+ Returns a unique name per worker , in order to
902
902
isolate tests for parallelization.
903
- :return: Name to use for creating an isolated schema
903
+ :return: Name to use for creating/accessing an isolated SQL database
904
904
:rtype: str
905
905
"""
906
906
return xdist .get_xdist_worker_id (request )
907
907
908
908
909
909
@pytest .fixture (scope = "session" )
910
910
def create_engines ():
911
- # Indirectly import dependencies. To avoid being picked up by depdency scanning software.
911
+ """
912
+ Fixture factory. Returns a list of lambda functions.
913
+ :return: create_engine_commands, a list of lambda functions that build an SQLAlchemy engine
914
+ :rtype: list[function, function]
915
+
916
+ :mockup:
917
+ create_engine_commands = [
918
+ MySQL,
919
+ Postgres,
920
+ ]
921
+ """
922
+ # Indirectly import dependencies. To avoid being picked up by dependency scanning software.
912
923
sqlalchemy = pytest .importorskip ("sqlalchemy" )
913
924
pymysql = pytest .importorskip ("pymysql" )
914
-
915
- # Round robin creation of DB connections.
916
925
create_engine_commands = [
917
926
lambda : sqlalchemy .create_engine ("mysql+pymysql://root@localhost:3306/pandas" , connect_args = {"client_flag" : pymysql .constants .CLIENT .MULTI_STATEMENTS }, poolclass = sqlalchemy .pool .NullPool ),
918
927
lambda : sqlalchemy .create_engine ("postgresql+psycopg2://postgres:postgres@localhost:5432/pandas" , poolclass = sqlalchemy .pool .NullPool , isolation_level = "AUTOCOMMIT" )
@@ -921,12 +930,78 @@ def create_engines():
921
930
922
931
923
932
@pytest .fixture (scope = "session" )
924
- def round_robin_ordering (worker_number ):
925
- round_robin_order = [(worker_number + i )% len (create_engine_commands ) for i in range (len (create_engine_commands ))]
933
+ def build_db_string (worker_name ):
934
+ """
935
+ Returns a list of queries used per SQL offering (Postgres, MySQL) to create per-worker DBs.
936
+ :return: build_db_string_query
937
+ :rtype: list[str, str]
938
+
939
+
940
+ :mockup:
941
+ build_db_string_query = [
942
+ MySQL,
943
+ Postgres,
944
+ ]
945
+ """
946
+ build_db_string_query = [
947
+ f"""CREATE DATABASE IF NOT EXISTS pandas{ worker_name } """ ,
948
+ f"""CREATE DATABASE pandas{ worker_name } """ ,
949
+ ]
950
+ return build_db_string_query
951
+
952
+
953
+ @pytest .fixture (scope = "session" )
954
+ def teardown_db_string (worker_name ):
955
+ """
956
+ Returns a list of queries used per SQL offering (Postgres, MySQL) to teardown per-worker DBs.
957
+ :return: teardown_db_string_query
958
+ :rtype: list[str, str]
959
+
960
+
961
+ :mockup:
962
+ teardown_db_string_query = [
963
+ MySQL,
964
+ Postgres,
965
+ ]
966
+ """
967
+ teardown_db_string_query = [
968
+ f"""DROP DATABASE pandas{ worker_name } """ ,
969
+ f"""DROP DATABASE pandas{ worker_name } """ ,
970
+ ]
971
+ return teardown_db_string_query
972
+
973
+
974
+ @pytest .fixture (scope = "session" )
975
+ def number_of_connections (create_engines , build_db_string , teardown_db_string ):
976
+ """
977
+ Asserts that there's parity between the number of strings and functions needed to create DBs.
978
+ Used for round-robin scheduling of DB initialization and teardown.
979
+ :return: len(build_db_string)
980
+ :rtype: int
981
+ """
982
+ assert len (create_engines ) == len (build_db_string ) == len (teardown_db_string )
983
+ return len (build_db_string )
984
+
985
+
986
+ @pytest .fixture (scope = "session" )
987
+ def round_robin_order (worker_number , number_of_connections ):
988
+ """
989
+ Round-robin ordering of threads to initialize their own DB, equalizing connectivitiy burden between each SQL engine.
990
+ :return: rr_order, a modular ring, e.g. with 2 DBs, w1 gets [1,0], w2 gets [0,1], w3 gets [1,0], etc.
991
+ :rtype: list[int]*number_of_connections
992
+ """
993
+ rr_order = [(worker_number + i ) % number_of_connections for i in range (number_of_connections )]
994
+ return rr_order
926
995
927
996
928
997
@pytest .fixture (scope = "session" )
929
998
def worker_number (worker_name ):
999
+ """
1000
+ Casts worker_name to an integer, making sure that with only one thread, or without xdist, DB connections are
1001
+ still made correctly.
1002
+ :return: worker_number, integer portion of worker_name, `1` if master.
1003
+ :rtype: int
1004
+ """
930
1005
if worker_name == 'master' :
931
1006
worker_number = 1
932
1007
else :
@@ -935,87 +1010,48 @@ def worker_number(worker_name):
935
1010
936
1011
937
1012
@pytest .fixture (scope = "session" )
938
- def create_db_string ():
939
- return [
940
- f"""CREATE DATABASE IF NOT EXISTS pandas{ worker_name } """ ,
941
- f"""CREATE DATABASE pandas{ worker_name } """
942
- ]
1013
+ def orphan_db_wrapper (request ):
1014
+ def take_care_of_orphan_dbs (create_engines , round_robin_order , teardown_db_string ):
1015
+ """
1016
+ Gets each thread's round-robin order, and connects to the appropriate SQL engine with the
1017
+ appropriate teardown query string.
1018
+ :return: None
1019
+ """
1020
+ sqlalchemy = pytest .importorskip ("sqlalchemy" )
1021
+ for rr_order in round_robin_order :
1022
+ engine = create_engines [rr_order ]()
1023
+ with engine .connect () as conn :
1024
+ conn .execute (sqlalchemy .text (teardown_db_string [rr_order ]))
1025
+ engine .dispose ()
1026
+ request .add_finalizer (take_care_of_orphan_dbs )
1027
+
943
1028
944
1029
945
1030
@pytest .fixture (scope = "session" )
946
- def execute_db_command ():
947
- for i in range (len (create_engine_commands )):
948
- engine = create_engines ()[round_robin_order ()[i ]]()
949
- connection = engine .connect ()
950
- connection .execute (sqlalchemy .text (create_db_string ()))
1031
+ def build_and_teardown_dbs (create_engines , round_robin_order , build_db_string , teardown_db_string ):
1032
+ """
1033
+ Gets each thread's round-robin order, and connects to the appropriate SQL engine with the
1034
+ appropriate build db query string.
1035
+ :return: None
1036
+ """
1037
+ sqlalchemy = pytest .importorskip ("sqlalchemy" )
1038
+ for rr_order in round_robin_order :
1039
+ engine = create_engines [rr_order ]()
1040
+ with engine .connect () as conn :
1041
+ conn .execute (sqlalchemy .text (build_db_string [rr_order ]))
1042
+ engine .dispose ()
1043
+ yield
1044
+ # Teardown DBs
1045
+ for rr_order in round_robin_order :
1046
+ engine = create_engines [rr_order ]()
1047
+ with engine .connect () as conn :
1048
+ conn .execute (sqlalchemy .text (teardown_db_string [rr_order ]))
1049
+ engine .dispose ()
951
1050
952
1051
953
1052
@pytest .fixture (scope = "session" , autouse = True )
954
- def prepare_db_setup (request , worker_name ):
955
- worker_number = worker_number
956
- create_engine_commands = create_engines ()
957
- create_db_command = create_db_string ()
958
- assert len (create_engine_commands ) == len (create_db_command )
959
-
960
- round_robin_order = round_robin_ordering ()
961
-
962
- for i in range (len (create_engine_commands )):
963
- engine = create_engine_commands [round_robin_order [i ]]()
964
- connection = engine .connect ()
965
- connection .execute (sqlalchemy .text (create_db_string [round_robin_order [i ]]))
966
- engine .dispose ()
967
- yield
968
- teardown_db_string = [
969
- f"""DROP DATABASE IF EXISTS pandas{ worker_name } """ ,
970
- f"""DROP DATABASE IF EXISTS pandas{ worker_name } """
971
- ]
972
-
973
- for i in range (len (create_engine_commands )):
974
- engine = create_engine_commands [round_robin_order [i ]]()
975
- connection = engine .connect ()
976
- connection .execute (sqlalchemy .text (teardown_db_string [round_robin_order [i ]]))
977
- engine .dispose ()
978
-
979
-
980
-
981
-
982
- # @pytest.fixture(scope="session")
983
- # def parallelize_mysql():
984
- # sqlalchemy = pytest.importorskip("sqlalchemy")
985
- # pymysql = pytest.importorskip("pymysql")
986
- #
987
- # engine = sqlalchemy.create_engine(
988
- # connection_string,
989
- # connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS},
990
- # poolclass=sqlalchemy.pool.NullPool,
991
- # )
992
- # with engine.connect() as connection:
993
- # connection.execute(sqlalchemy.text(
994
- # f"""
995
- # CREATE DATABASE IF NOT EXISTS pandas{worker_name};
996
- # """
997
- # ))
998
- # # connection.commit()
999
- # # connection.close()
1000
- # yield
1001
- # engine.dispose()
1002
- #
1003
- # pass
1004
-
1005
-
1006
-
1007
-
1008
- # @pytest.fixture(scope="session", autouse=True)
1009
- # def set_up_dbs(parallelize_mysql_dbs, request):
1010
- # if hasattr(request.config, "workerinput"):
1011
- # # The tests are multi-threaded
1012
- # worker_name = xdist.get_xdist_worker_id(request)
1013
- # worker_count = request.config.workerinput["workercount"]
1014
- # print(worker_name, worker_count)
1015
- # parallelize_mysql_dbs(request, worker_name, worker_count)
1016
- # else:
1017
- # quit(1)
1018
- # parallelize_mysql_dbs
1053
+ def execution_point (build_and_teardown_dbs ):
1054
+ yield
1019
1055
1020
1056
1021
1057
@@ -1104,11 +1140,8 @@ def prepare_db_setup(request, worker_name):
1104
1140
sqlalchemy_connectable_types + ["sqlite_buildin_types" ] + adbc_connectable_types
1105
1141
)
1106
1142
1107
- #TODO fix
1108
- @pytest .mark .parametrize ("conn" , [
1109
- #pytest.param("mysql_pymysql_engine", marks=pytest.mark.db),
1110
- pytest .param ("mysql_pymysql_conn" , marks = pytest .mark .db ),
1111
- ])
1143
+
1144
+ @pytest .mark .parametrize ("conn" , all_connectable )
1112
1145
def test_dataframe_to_sql (conn , test_frame1 , request ):
1113
1146
# GH 51086 if conn is sqlite_engine
1114
1147
conn = request .getfixturevalue (conn )
0 commit comments