I'm kinda new to the SQL world, but I was following a tutorial called Optimizing pandas.read_sql for Postgres. The thing is, I'm working with a big dataset, similar to the example in the tutorial and I need a faster way to execute my query and turn it into a DataFrame. There, they use this function:
def read_sql_tmpfile(query, db_engine):
with tempfile.TemporaryFile() as tmpfile:
copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
query=query, head="HEADER"
)
conn = db_engine.raw_connection()
cur = conn.cursor()
cur.copy_expert(copy_sql, tmpfile) # I want to replicate this
tmpfile.seek(0)
df = pandas.read_csv(tmpfile)
return df
And I tried to replicate it, like this:
def read_sql_tmpfile(query, connection):
with tempfile.TemporaryFile() as tmpfile:
copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
query=query, head="HEADER"
)
cur = connection.cursor()
cur.copy_expert(copy_sql, tmpfile)
tmpfile.seek(0)
df = pandas.read_csv(tmpfile)
return df
The thing is, cursor.copy_expert
comes from the psycopg2
library for PostgreSQL, and I can't find a way to do the same thing with pymysql
. Is there any way to do this? What should I do? Thanks
SELECT ... INTO OUTFILE 'file_name'
, but "file_name cannot be an existing file", so there is no way to use a tempfile.TemporaryFile()
. SELECT * FROM table
style query. But even something limited to turning a table into a CSV file would be useful. You can pretty easily write your file to /tmp
, which gets cleared between reboots. You can also add your own decorator/context manager to apply similar niceties as those you get from tempfile.TemporaryFile
. A quick example would be something like this...
import psutil
class SQLGeneratedTemporaryFile:
def __init__(self, filename):
self.filename = filename
def __enter__(self):
# run your query and write to your file with the name `self.filename`
def __exit__(self, *exc):
psutil.unlink(self.filename)
To figure out which of these answers was fastest, I benchmarked each of them on a synthetic dataset. This dataset consisted of 100MB of time-series data, and 500MB of text data. (Note: this is measured using Pandas, which heavily penalizes small objects versus data which can be represented in NumPy.)
I benchmarked 5 methods:
read_sql()
.All methods were tried seven times, in random order. In the following tables, a lower score is better.
Time series benchmark:
Method | Time (s) | Standard Error (s) |
---|---|---|
pipe | 6.719870 | 0.064610 |
pipe_no_fcntl | 7.243937 | 0.104802 |
tofile | 7.636196 | 0.125963 |
sftp | 9.926580 | 0.171262 |
naive | 11.125657 | 0.470146 |
Text benchmark:
Method | Time (s) | Standard Error (s) |
---|---|---|
pipe | 8.452694 | 0.217661 |
tofile | 9.502743 | 0.265003 |
pipe_no_fcntl | 9.620349 | 0.420255 |
sftp | 12.189046 | 0.294148 |
naive | 13.769322 | 0.695961 |
This is the pipe method, which was fastest.
import os
import pandas as pd
import subprocess
import tempfile
import time
import fcntl
db_server = '...'
F_SETPIPE_SZ = 1031
def read_sql_pipe(query, database):
args = ['mysql', f'--login-path={db_server}', database, '-B', '-e', query]
try:
# Run mysql and capture output
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
except FileNotFoundError:
# MySQL is not installed. Raise a better error message.
raise Exception("The mysql command is not installed. Use brew or apt to install it.") from None
# Raise amount of CSV data buffered up to 1MB.
# This is a Linux-only syscall.
fcntl.fcntl(proc.stdout.fileno(), F_SETPIPE_SZ, 1 << 20)
df = pd.read_csv(proc.stdout, delimiter='\t')
retcode = proc.wait()
if retcode != 0:
raise subprocess.CalledProcessError(
retcode, proc.args, output=proc.stdout, stderr=proc.stderr
)
return df
The basic idea is to use the subprocess module to invoke mysql, with the stdout of MySQL being fed to a pipe. A pipe is a file-like object, which can be directly passed to pd.read_csv()
. The MySQL process creates the CSV concurrently with Pandas reading the CSV, so this leads to an advantage over the method which writes the entire file before Pandas starts reading it.
A note about fcntl: fcntl is useful here because the amount of data which can be buffered in the pipe is limited to 64kB by default. I found that raising this to 1MB lead to a ~10% speedup. If this is unavailable, a solution which writes the CSV to a file may outperform the pipe method.
This solution is most similar to @MikeF's solution, so they get the bounty.
The dataset was generated with the following script.
import pandas as pd
import numpy as np
from english_words import get_english_words_set
np.random.seed(42)
import util
def gen_benchmark_df(data_function, limit):
i = 0
df = data_function(i)
i += 1
while df.memory_usage(deep=True).sum() < limit:
df = pd.concat([df, data_function(i)], ignore_index=True)
i += 1
# Trim excess rows
row_count = len(df.index)
data_size_bytes = df.memory_usage(deep=True).sum()
row_count_needed = int(row_count * (limit / data_size_bytes))
df = df.head(row_count_needed)
return df
def gen_ts_chunk(i):
rows = 100_000
return pd.DataFrame({
'run_id': np.random.randint(1, 1_000_000),
'feature_id': np.random.randint(1, 1_000_000),
'timestep': np.arange(0, rows),
'val': np.cumsum(np.random.uniform(-1, 1, rows))
})
def gen_text_chunk(i):
rows = 10_000
words = list(get_english_words_set(['web2'], lower=True))
text_strings = np.apply_along_axis(lambda x: ' '.join(x), axis=1, arr=np.random.choice(words, size=(rows, 3)))
return pd.DataFrame({
'id': np.arange(i * rows, (i + 1) * rows),
'data': text_strings
})
dataset_size = 1e8
con = util.open_engine()
timeseries_df = gen_benchmark_df(gen_ts_chunk, dataset_size)
timeseries_df.to_sql('timeseries', con=con, if_exists='replace', index=False, chunksize=10_000)
dataset_size = 5e8
text_df = gen_benchmark_df(gen_text_chunk, dataset_size)
text_df.to_sql('text', con=con, if_exists='replace', index=False, chunksize=10_000)
Assuming that Nick's question is
How can I create a CSV file on the client from a MySQL table?
At a commandline prompt do
mysql -u ... -p -h ... dbname -e '...' >localfile.csv
where the executable statement is something like
SELECT col1, col2, col3, col4
FROM mytable
Notes:
cmd
; *nix: some 'terminal' app.dbname
has the effect of "use dbname;".WHERE
(etc) can be included as needed.SHOW ...
acts very much like SELECT
.mysql
.Example (without -u -p -h showing):
# mysql -e "show variables like 'max%size'" | tr '\t' ','
Variable_name,Value
max_binlog_cache_size,18446744073709547520
max_binlog_size,104857600
max_binlog_stmt_cache_size,18446744073709547520
max_heap_table_size,16777216
max_join_size,18446744073709551615
max_relay_log_size,0
As mentioned in the comments, and in this answer, you are looking for SELECT ... INTO OUTFILE
.
Here is a small (untested) example, based on your question:
def read_sql_tmpfile(query, connection):
# Create tmp file name without creating the file
tmp_dir = tempfile.mkdtemp()
tmp_file_name = os.path.join(tmp_dir, next(tempfile._get_candidate_names()))
# Copy data into temporary file
copy_sql = "{query} INTO OUTFILE {outfile}".format(
query=query, outfile=tmp_file_name
)
cur = connection.cursor()
cur.execute(copy_sql)
# Read data from file
df = pandas.read_csv(tmp_file_name)
# Cleanup
os.remove(tmp_file_name)
return df
with tempfile
would create the file. Updated my answer to just get a temp file name, but don't allocate it. I'm aware, that the question is basically answered by waynetech's comment. But I was interested and the details and implications are not always obvious, so here is the tested, copy-pastable solution.
Since the output file ends up on the DB server, the solution involves handling the temp directory on the server and transferring the file to the client. For sake of simplicity I used SSH & SFTP for this. This assumes that the SSH keys of both machines have been exchanged beforehand. The remotefile transfer and handling maybe easier by involving a samba share or something like that.
@Nick ODell: Please give this solution a chance, do a benchmark! I'm pretty sure the copy overhead isn't significant for larger amounts of data.
def read_sql_tmpfile(query, connection):
df = None
# Create unique temp directory on server side
cmd = "mktemp -d"
(out_mktemp, err) = subprocess.Popen(f'ssh {username}@{db_server} "{cmd}"', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
if err or not out_mktemp:
return
# remove additional white spaces around the output
tmp_dir = out_mktemp.decode().strip()
# The following command should be made superfluous by tweaking the group memberships
# to grant `mysql` user full access to the directory created by the user which executes the `mktemp` command
cmd = f"chmod 777 -R {tmp_dir}"
res = os.system(f'ssh {username}@{db_server} "{cmd}"')
if res:
return
try:
remote_tmp_file = f'{tmp_dir}/sql_tmpfile'
# remember: db-connection's user need `FILE` privilege
# think about sql injection, pass MySql parameters in query and corresponding parameters list to this function if appropriate
copy_sql = f"{query} INTO OUTFILE '{remote_tmp_file}'"
cur = connection.cursor()
cur.execute(copy_sql)
local_tmp_file = os.path.basename(remote_tmp_file)
cmd = f"sftp {username}@{db_server}:{remote_tmp_file} {local_tmp_file}"
res = os.system(cmd)
if not res and os.path.isfile(local_tmp_file):
try:
df = pandas.read_csv(local_tmp_file)
finally:
# cleanup local temp file
os.remove(local_tmp_file)
finally:
# cleanup remote temp dir
cmd = f"rm -R {tmp_dir}"
os.system(f'ssh {username}@{db_server} "{cmd}"')
return df
copy_sql
line) would perform. Anyway, this is almost a full-fledged research project, a kind of of "Optimizing pandas.read_sql for MySQL". tail -f
to stream the file, but that creates a race condition. The file is not created as soon as the MySQL statement is sent - it could be delayed. Duration export
+ Duration read
is the baseline and reading in can only start after export. However, when reading in via the share, Duration copy
and Duration read
could merge and decrease overall.