Updates to the Pandas Alternatives article

pull/4/head
vaclavdekanovsky 2021-01-25 22:43:08 +01:00
parent 84bd1126d9
commit f3e73b33f8
4 changed files with 925 additions and 268 deletions

View File

@ -1,5 +1,13 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Julia Proof of Concept\n",
"In this notebook we explore processing of csv file in julia. We will load two files, join them, group by and aggregate and sort the results. In the end we run all the steps 7 times as a performance test. We also explore that julia needs to compile the code only once. "
]
},
{
"cell_type": "code",
"execution_count": 1,
@ -73,9 +81,10 @@
{
"data": {
"text/plain": [
"2-element Array{String,1}:\n",
"3-element Array{String,1}:\n",
" \"train_transaction.csv\"\n",
" \"train_identity.csv\""
" \"train_identity.csv\"\n",
" \"train_transaction_2.csv\""
]
},
"execution_count": 3,
@ -85,7 +94,7 @@
],
"source": [
"folder = \"/home/vaclav/Data/Kaggle/EEE-CIS_Fraud_Detection\"\n",
"files = [\"train_transaction.csv\", \"train_identity.csv\"]"
"files = [\"train_transaction.csv\", \"train_identity.csv\", \"train_transaction_2.csv\"]"
]
},
{
@ -110,30 +119,21 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"┌ Warning: inner joining data frames using join is deprecated, use `innerjoin(df1, df2, on=TransactionID, makeunique=false, validate=(false, false))` instead\n",
"│ caller = ip:0x0\n",
"└ @ Core :-1\n"
]
},
{
"data": {
"text/plain": [
"Dict{Any,Any} with 5 entries:\n",
" \"merge\" => 6.369\n",
" \"sort\" => 6.896\n",
" \"load_transactions\" => 28.601\n",
" \"aggregation\" => 6.078\n",
" \"load_identity\" => 0.307"
" \"merge\" => 4.816\n",
" \"sort\" => 8.381\n",
" \"load_transactions\" => 8.158\n",
" \"aggregation\" => 5.758\n",
" \"load_identity\" => 0.292"
]
},
"execution_count": 6,
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
@ -157,7 +157,7 @@
"\n",
"# join\n",
"ts = now()\n",
"dff = join(df, df2, kind = :inner, on = \"TransactionID\")\n",
"dff = innerjoin(df, df2, on = \"TransactionID\")\n",
"te = now()\n",
"time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
"push!(s, \"merge\"=>time_in_sec)\n",
@ -550,22 +550,22 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"run_julia (generic function with 1 method)"
"run_julia (generic function with 2 methods)"
]
},
"execution_count": 4,
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"function run_julia()\n",
"function run_julia(threads=1)\n",
" s = Dict()\n",
" f = open(\"julia.csv\",\"a\")\n",
"\n",
@ -575,7 +575,7 @@
" te = now()\n",
" time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
" push!(s, \"load_transactions\"=>time_in_sec)\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia|load_transactions|\",time_in_sec,\"\\n\"))\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia-\",threads,\"|load_transactions|\",time_in_sec,\"\\n\"))\n",
" \n",
"\n",
" # load identity ~25MB\n",
@ -584,7 +584,7 @@
" te = now()\n",
" time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
" push!(s, \"load_identity\"=>time_in_sec)\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia|load_identity|\",time_in_sec,\"\\n\"))\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia-\",threads,\"|load_identity|\",time_in_sec,\"\\n\"))\n",
"\n",
" # join\n",
" ts = now()\n",
@ -592,7 +592,7 @@
" te = now()\n",
" time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
" push!(s, \"merge\"=>time_in_sec)\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia|merge|\",time_in_sec,\"\\n\"))\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia-\",threads,\"|merge|\",time_in_sec,\"\\n\"))\n",
"\n",
" # group by\n",
" ts = now()\n",
@ -602,7 +602,7 @@
" te = now()\n",
" time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
" push!(s, \"aggregation\"=>time_in_sec)\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia|aggregation|\",time_in_sec,\"\\n\"))\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia-\",threads,\"|aggregation|\",time_in_sec,\"\\n\"))\n",
"\n",
" # group by\n",
" ts = now()\n",
@ -612,7 +612,7 @@
" te = now()\n",
" time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
" push!(s, \"sort\"=>time_in_sec)\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia|sorting|\",time_in_sec,\"\\n\"))\n",
" write(f,string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia-\",threads,\"|sorting|\",time_in_sec,\"\\n\"))\n",
" \n",
" close(f)\n",
" return s\n",
@ -621,7 +621,27 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"Threads.nthreads()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
@ -636,7 +656,7 @@
],
"source": [
"for i in 1:7\n",
" run_julia()\n",
" run_julia(Threads.nthreads())\n",
"end"
]
},
@ -660,6 +680,76 @@
"# expected csv output\n",
"string(Dates.format(now(), \"YYYY-mm-dd HH:MM:SS\"),\"|julia|load_identity|\",time_in_sec)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## How much does the compyling slow down the load"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"26.107"
]
}
],
"source": [
"ts = now()\n",
"df = CSV.read(joinpath(folder,files[1]), DataFrame)\n",
"te = now()\n",
"time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
"print(time_in_sec)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"8.401"
]
}
],
"source": [
"ts = now()\n",
"df = CSV.read(joinpath(folder,files[1]), DataFrame)\n",
"te = now()\n",
"time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
"print(time_in_sec)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"8.666"
]
}
],
"source": [
"ts = now()\n",
"df = CSV.read(joinpath(folder,files[3]), DataFrame)\n",
"te = now()\n",
"time_in_sec = (te-ts) / Millisecond(1) * (1 / 1000)\n",
"print(time_in_sec)"
]
}
],
"metadata": {

View File

@ -0,0 +1,290 @@
import sys
import os
import gc
import psutil
from time import time, sleep, strftime, localtime
import pandas as pd
import dask.dataframe as dd
#import modin.pandas as mpd
import vaex
from pyspark.sql import SparkSession, functions
# data based on https://www.kaggle.com/c/ieee-fraud-detection/data
folder = "/home/vaclav/Data/Kaggle/EEE-CIS_Fraud_Detection"
files = ["train_transaction.csv", "train_identity.csv"]
paths = [os.path.join(folder, f) for f in files]
class Events:
def __init__(self, path):
self.file = open(path, 'a', encoding='utf-8')
def log(self, time_end, tool, operation, duration):
self.file.write("|".join([strftime('%Y-%m-%d %H:%M:%S', localtime(time_end)),tool,operation,str(duration)])+"\n")
def close(self):
self.file.close()
def run_pandas(logger):
"""Performance test in pandas"""
s = {}
ts = time()
df = pd.read_csv(paths[0])
te = time()
s["load_transactions"] = te-ts
logger.log(te, "pandas", "load_transactions", te-ts)
ts = time()
df2 = pd.read_csv(paths[1])
te = time()
s["load_identity"] = te-ts
logger.log(te, "pandas", "load_identity", te-ts)
ts = time()
dff = df.merge(df2, on="TransactionID")
te = time()
s["merge"] = te-ts
logger.log(te, "pandas", "merge", te-ts)
ts = time()
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"].agg(["mean","sum"])
te = time()
s["aggregation"] = te-ts
logger.log(te, "pandas", "aggregation", te-ts)
ts = time()
dff.sort_values(by=["card1","addr1","D9"], inplace=True)
dff.sort_values(by=["addr1","D9","card1"], inplace=True)
dff.sort_values(by=["D9","card1","addr1"], inplace=True)
te = time()
s["sorting"] = te-ts
logger.log(te, "pandas", "sorting", te-ts)
return s
def run_dask(logger):
s = {}
ts = time()
df = dd.read_csv(paths[0])
te = time()
s["load_transactions"] = te-ts
logger.log(te, "dask", "load_transactions", te-ts)
ts = time()
df2 = dd.read_csv(paths[1])
te = time()
s["load_identity"] = te-ts
logger.log(te, "dask", "load_identity", te-ts)
ts = time()
dff = df.merge(df2, on="TransactionID")
te = time()
s["merge"] = te-ts
logger.log(te, "dask", "merge", te-ts)
# the difference is that we call compute method, which runs all the computations at this point
ts = time()
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"]\
.agg(["mean","sum"])\
.compute()
te = time()
s["aggregation"] = te-ts
logger.log(te, "dask", "aggregation", te-ts)
# parallel soring is tricky that is why there are only work arounds in dask.
ts = time()
dff.set_index("card1").compute()
te = time()
s["sorting"] = te-ts
logger.log(te, "dask", "sorting", te-ts)
def run_vaex(logger):
s = {}
ts = time()
df = vaex.open(paths[0])
te = time()
s["load_transactions"] = te-ts
logger.log(te, "vaex", "load_transactions", te-ts)
ts = time()
df2 = vaex.open(paths[1])
te = time()
s["load_identity"] = te-ts
logger.log(te, "vaex", "load_identity", te-ts)
ts = time()
dff = df.join(df2, on="TransactionID")
te = time()
s["merge"] = te-ts
logger.log(te, "vaex", "merge", te-ts)
# the difference is that we call compute method, which runs all the computations at this point
ts = time()
grp = dff.groupby([dff["isFraud"],dff["ProductCD"],dff["card4"],dff["card6"],dff["id_15"],dff["id_31"]],
agg=[vaex.agg.mean('TransactionAmt'), vaex.agg.sum('TransactionAmt')])
te = time()
s["aggregation"] = te-ts
logger.log(te, "vaex", "aggregation", te-ts)
# the difference is that we call compute method, which runs all the computations at this point
ts = time()
dff_s = dff.sort(by=["card1","addr1","D9"])
dff_s = dff.sort(by=["addr1","D9","card1"])
dff_s = dff.sort(by=["D9","card1","addr1"])
te = time()
s["sorting"] = te-ts
logger.log(te, "vaex", "sorting", te-ts)
def run_spark(my_spark, logger):
s = {}
tool = "spark"
s = {}
ts = time()
df = my_spark.read.csv(paths[0],inferSchema = True,header= True)
te = time()
s["load_transactions"] = te-ts
logger.log(te, "spark", "load_transactions", te-ts)
ts = time()
df2 = my_spark.read.csv(paths[1],inferSchema = True,header= True)
te = time()
s["load_identity"] = te-ts
logger.log(te, "spark", "load_identity", te-ts)
ts = time()
dff = df.join(df2, "TransactionID")
te = time()
s["merge"] = te-ts
logger.log(te, "spark", "merge", te-ts)
# the difference is that we call collect method, which runs all the computations at this point
ts = time()
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"]) \
.agg(functions.avg("TransactionAmt"), functions.sum("TransactionAmt"))\
.collect()
te = time()
s["aggregation"] = te-ts
logger.log(te, "spark", "aggregation", te-ts)
ts = time()
dff.orderBy("card1","addr1","D9").collect()
# alternatively
# dff.sort("card1","addr1","D9").collect()
te = time()
s["sorting"] = te-ts
logger.log(te, "spark", "sorting", te-ts)
def run_modin(logger):
s = {}
ts = time()
df = mpd.read_csv(paths[0])
te = time()
s["load_transactions"] = te-ts
logger.log(te, "modin", "load_transactions", te-ts)
ts = time()
df2 = mpd.read_csv(paths[1])
te = time()
s["load_identity"] = te-ts
logger.log(te, "modin", "load_identity", te-ts)
ts = time()
dff = df.merge(df2, on="TransactionID")
te = time()
s["merge"] = te-ts
logger.log(te, "modin", "merge", te-ts)
# modin defaults to pandas for multiple column aggregation and then fails on KeyError, though the key is available
ts = time()
try:
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"].agg(["mean","sum"])
except Exception as e:
print(e)
te = time()
s["aggregation"] = te-ts
logger.log(te, "modin", "aggregation", te-ts)
def system_resources(n, pause, cpu_threshold = 0.5, mem_threshold = 0.5):
cpu = []
mem = []
cpu_within_limit = True
mem_within_limit = True
for i in range(n):
cpu.append(psutil.cpu_percent())
mem.append(psutil.virtual_memory().percent)
sleep(pause)
cpu = sum(cpu)/n
mem = sum(mem)/n
if cpu / 100 > cpu_threshold:
cpu_within_limit = False
if mem / 100 > mem_threshold:
mem_within_limit = False
return {"cpu": cpu, "memory": mem, "cpu_limit": cpu_within_limit, "mem_limit": mem_within_limit }
def clean(wait_time: int=15):
"""Cleans created DataFrames and call the garbage collector to actions. Wait for 15s by default"""
df, df2, dff, grp = None, None, None, None
gc.collect()
sleep(wait_time)
return None
def check_resources():
# if memory or cpu usage is high, clean resources and wait 60s
res_breached = True
while res_breached:
res = system_resources(3,1)
if res["mem_limit"] and res["cpu_limit"]:
res_breached = False
else:
print(f"CPU/Memory over limit {res}")
clean(60)
if __name__ == "__main__":
# logging
logger = Events("l_2.log")
# Create my_spark
my_spark = SparkSession.builder \
.master("local") \
.appName("Pandas Alternative") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# 7 rounds
for i in range(7):
check_resources()
print(f"{i} pandas")
run_pandas(logger)
check_resources()
print(f"{i} vaex")
run_vaex(logger)
check_resources()
print(f"{i} dask")
run_dask(logger)
check_resources()
print(f"{i} spark")
run_spark(my_spark, logger)
check_resources()
#run_modin(logger)
#check_resources()
logger.close()
my_spark.stop()

View File

@ -0,0 +1,168 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Process performance test log\n",
"In this notebook we process the performance test results generated by `Performance_test.py`. "
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import plotly.express as px\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"df = pd.read_csv(\"l.log\", sep=\"|\", header=None, names=[\"timestamp\",\"process\",\"step\",\"time\"])\n",
"j = pd.read_csv(\"julia.csv\", sep=\"|\", header=None, names=[\"timestamp\",\"process\",\"step\",\"time\"])"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"df = pd.concat([df,j])"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array(['pandas', 'vaex', 'dask', 'spark', 'julia-first', 'julia',\n",
" 'julia-4-first', 'julia-4'], dtype=object)"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df[\"process\"].unique()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"d = df[df[\"process\"]==\"spark\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"px.bar(d, x=\"step\", y=\"time\")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"res = df.groupby([\"process\",\"step\"])[\"time\"].mean().reset_index()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pandas vs Dask\n",
"r = res[res[\"process\"].isin([\"pandas\",\"dask\"])]\n",
"fig = px.bar(r, color=\"process\", y=\"time\", x=\"step\", barmode=\"group\", \n",
" title=\"Pandas vs Dask\", \n",
" category_orders={\"process\":[\"pandas\",\"dask\"]}, \n",
" color_discrete_sequence=[\"blue\",\"forestgreen\"])\n",
"fig.update_layout(xaxis={'categoryorder':'array', 'categoryarray':[\"load_transactions\",\"load_identity\",\"merge\"]})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pandas vs Spark\n",
"r = res[res[\"process\"].isin([\"pandas\",\"spark\"])]\n",
"fig = px.bar(r, color=\"process\", y=\"time\", x=\"step\", barmode=\"group\", \n",
" title=\"Pandas vs Spark\", \n",
" category_orders={\"process\":[\"pandas\",\"spark\"]}, \n",
" color_discrete_sequence=[\"blue\",\"forestgreen\"])\n",
"fig.update_layout(xaxis={'categoryorder':'array', 'categoryarray':[\"load_transactions\",\"load_identity\",\"merge\"]})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pandas vs Vaex\n",
"r = res[res[\"process\"].isin([\"pandas\",\"vaex\"])]\n",
"fig = px.bar(r, color=\"process\", y=\"time\", x=\"step\", barmode=\"group\", title=\"Pandas vs Vaex\", color_discrete_sequence=[\"blue\",\"forestgreen\"])\n",
"fig.update_layout(xaxis={'categoryorder':'array', 'categoryarray':[\"load_transactions\",\"load_identity\",\"merge\"]})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Pandas vs Julia\n",
"processes = [\"pandas\",\"julia-first\",\"julia\",\"julia-4-first\",\"julia-4\"]\n",
"r = res[res[\"process\"].isin(processes)]\n",
"fig = px.bar(r, color=\"process\", y=\"time\", x=\"step\", \n",
" barmode=\"group\", title=\"Pandas vs Julia\", \n",
" category_orders={\"process\":processes}, \n",
" color_discrete_sequence=[\"blue\",\"lightgreen\",\"forestgreen\",\"orchid\",\"darkorchid\"])\n",
"fig.update_layout(xaxis={'categoryorder':'array', 'categoryarray':[\"load_transactions\",\"load_identity\",\"merge\"]})"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}