Export Large SQL Query Result with Python pyodbc and dask Libraries

By:   |   Updated: 2023-01-03   |   Comments (2)   |   Related: More > Python


Problem

If you are working with time series data with high frequency or some recording of a recurring event, then your dataset might grow tremendously over time. The overall size could reach a total row count of tens or even hundreds of millions. What options do you have as a data professional to export such a dataset to one or multiple csv files?

Solution

Using Python, we can connect to an MSSQL database and perform DML operations. Then we can export the in-memory results. However, the challenge here is the limited amount of memory we have defined by the machine and the environment settings. We will show how a typical go-to solution will fail due to this limitation. Then we will show a solution that works, using Python and the pyodbc and dask libraries.

Solution Environment

I have created a database in Azure called TestDB with a single table UserAccessLogs. The table has been created with the following script:

CREATE TABLE [dbo].[UserAccessLogs](
    [RowId] [int] IDENTITY(1,1) NOT NULL,
    [IP] [varchar](20) NOT NULL,
    [UserId] [varchar](20) NOT NULL,
    [Timestamp] [datetime] NULL
) ON [PRIMARY]
GO
ALTER TABLE [dbo].[UserAccessLogs] ADD PRIMARY KEY CLUSTERED 
(
    [RowId] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
GO

The data were generated artificially and represent a basic user access log with an IP address, user id and timestamp. Here is a sample:

data sample

Additionally, I have created a Jupyter notebook where I will run this experiment. I am running Python 3.10 in a conda virtual environment. First, I will do some imports and set some variables to connect to our database in Azure:

jupyter notebook configuration

With these variables ready, we can try importing our data.

Attempt 1 with pandas

We will use pyodbc to establish a connection to the SQL server and query the table. Then, we can use a pandas dataframe object to store the results of our query. First, let us find out (programmatically) the total amount of rows we are dealing with. We will do this by using the with keyword to establish a context manager for our database connection. Inside that context, we will execute a simple query with COUNT():

total_rows = 0
with pyodbc.connect('DRIVER='+driver+';SERVER=tcp:'+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+ password) as conn:
    with conn.cursor() as cursor:
        cursor.execute('SELECT COUNT(*) FROM UserAccessLogs')
        row = cursor.fetchone()
        total_rows = int(row[0])
        
print(total_rows)

The result is 81 million rows:

get row count

The next order of business is reading these data. Pyodbc has an object cursorwhich represents a database cursor. This cursor has a couple of functions we can consider:

  • fetchone(): irrelevant as it returns only the first row
  • fetchall(): not feasible as it will try to load all 81 million rows in memory. Well-suited for scenarios with way fewer rows.
  • fetchmany(): feasible as it allows passing an argument for how many rows to be returned at a time.

To use fetchmany(), we must set a variable size. Then we can set a variable iterations which represents how many times we will call fetchmany(). Naturally, that would equal the total amount of rows divided by the size. Here the iterations will equal 162, meaning the loop will call fetchmany() 162 times, extracting 500,000 rows every time for a total of 81m rows. If the count of rows is not even, then more complex logic would have to be designed to handle the last set of rows.

Next, we must create an empty dataframe called result_df. The goal is to avoid handling the rows one by one, which would be inefficient. We need to take advantage of Python lists and the append method of the dataframe object. Therefore, inside the loop we will use another dataframe called current_df. At each iteration we will append current_df to result_df:

size = 500000
iterations = total_rows // size
 
result_df = pd.DataFrame(columns = ['rowid', 'userid', 'ip', 'ts'])
 
with pyodbc.connect('DRIVER='+driver+';SERVER=tcp:'+server+';PORT=1433;DATABASE='+database+';UID='+username+';PWD='+ password) as conn:
    with conn.cursor() as cursor:
        cursor.execute('SELECT RowId, IP, UserID, Timestamp FROM UserAccessLogs')
        
        for i in range(iterations):
            print(f'current iter: {i}')
 
            rows = cursor.fetchmany(size)              
            current_df = pd.DataFrame.from_records(rows, columns =['rowid', 'userid', 'ip','ts'])
            result_df = result_df.append(current_df)
            
        print(result_df)

Here is what the code looks like:

load the data into a dataframe object

Initially, it looks good. However, at iteration 50 (about a third of the data) we get an out of memory exception, meaning the in-memory size of the target data object result_df grew beyond a certain limit:

out of memory exception

Why did we get this error? The answer is that a pandas dataset required 5 to 10 times as much RAM as the size of the dataset. In our case we have about 4.73 GB of data:

dataset size in GB

My machine with 16GB of RAM is poorly suited to handle this in memory. A better configuration with 48 GB of free memory could do the trick, if you had one.

Attempt 2 with Dask

Dask is a flexible library for parallel computing in Python. It supports big data collections such as dataframes that extend common interfaces like pandas to larger-than-memory environments. Because Dask is designed to work with larger-than-memory dataframes, you cannot use arbitrary text queries, only whole tables. So we can use the method read_sql_table() to read the whole table to memory, regardless of its size:

driver= 'ODBC Driver 17 for SQL Server' # remove curly brackets
connection_string = f'mssql+pyodbc://{username}:{password}@{server}/{database}?driver={driver}'
 
data = dd.read_sql_table('UserAccessLogs', 
                            connection_string, 
                            index_col='RowId')

You must provide the table name, the connection string, and an index column. This should typically be an indexed column in SQL (e.g., a numeric primary key). Dask will use it to define the partitioning of the table. Here is the result:

read sql table into a dask dataframe

It took Dask a bit longer than a minute to partition the whole table. The output on the screenshot above also displays the schema of the table (IP, UserId, Timestamp) and the number of partitions (45). If needed the number of partitions can be set manually by using the npartitions parameter.

The next and last step would be to export the data to a csv file. We can do this by using the to_csv method which we call on the dask dataframe. It is important to note that one filename per partition will be created. On my system this operation took about 63 minutes to complete with the following script:

data.to_csv('export-*.csv')

The asterisk in the file is used as a placeholder for the sequential file number. The result is 45 files, equal to the number of partitions:

list of exported files
list of exported files up to file 45

Additionally, with the single_file parameter set to True, we can output just one single file. While that can be handy, two issues may occur:

  • The excel row limit is 1,048,576 rows. Technically the csv format can hold more than that, however, the rows beyond that limit will be a subject to data loss and will not be accessible with the Excel app. To circumvent this problem, you can use notepad++ or similar editor that does not have a hard row limit.
  • Any error occurring during the extraction process (e.g., network connectivity disruption or sql server resource limitation) will cause the whole operation to fail.

Considering these two complications, you can also consider using dask's read_sql_query() method. It will allow you to specify a query, instead of by default reading the whole table. Thus, you can for example partition your data manually by using the query itself.

Conclusion

Exporting data from SQL Server is straightforward and quick when there are not a lot of data. An approach with a pandas dataframe and pyodbc fetchmany() cursor method may work for about 1.5 GB of data on disk, considering the attempt failed when a third of the data was loaded in memory. You also should keep in mind the size of the data on disk may differ from the size in memory. However, when we enter big data territory standard libraries and approaches may not always work. In such a case, the dask library can turn your workstation into your own parallel distributed environment that can handle hundreds of millions of rows. Finally, the performance of these operations (read_sql_table or read_sql_query) will depend on the database setup, the network bandwidth and the machine running dask. Keep in mind that dask will keep a read lock on the database table while writing to the csv file output.

Next Steps





get scripts

next tip button



About the author
MSSQLTips author Hristo Hristov Hristo Hristov is a Data Scientist and Power Platform engineer with more than 12 years of experience. Between 2009 and 2016 he was a web engineering consultant working on projects for local and international clients. Since 2017, he has been working for Atlas Copco Airpower in Flanders, Belgium where he has tackled successfully multiple end-to-end digital transformation challenges. His focus is delivering advanced solutions in the analytics domain with predominantly Azure cloud technologies and Python. Hristoís real passion is predictive analytics and statistical analysis. He holds a masterís degree in Data Science and multiple Microsoft certifications covering SQL Server, Power BI, Azure Data Factory and related technologies.

View all my tips


Article Last Updated: 2023-01-03

Comments For This Article




Friday, February 3, 2023 - 4:19:37 AM - Hristo Hristov Back To Top (90876)
Hi Joseph,

First of all make sure to import dask:
import dask.dataframe as dd

Then googling your error tells me to make sure the python script file you are running must not be named like any of the libraries used (e.g. dask.py). Lastly, I started with a clean Conda environment, can you do that too?

Wednesday, February 1, 2023 - 9:13:31 PM - joseph digeronimo Back To Top (90871)
None of the code in the dask and pyodbc code works because I receive this error.
AttributeError: partially initialized module 'pyodbc' has no attribute 'connect' (most likely due to a circular import)

Do you have any idea why this is happening?














get free sql tips
agree to terms