Initial commit

pull/1/head
Mike Kraus 2019-10-25 13:41:33 +02:00
commit 4fbfc81e84
40 changed files with 1527 additions and 0 deletions

BIN
.DS_Store vendored 100644

Binary file not shown.

2
.gitattributes vendored 100644
View File

@ -0,0 +1,2 @@
# Auto detect text files and perform LF normalization
* text=auto

123
.gitignore vendored 100644
View File

@ -0,0 +1,123 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

1
.idea/.name 100644
View File

@ -0,0 +1 @@
incremental_training

View File

@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding" addBOMForNewFiles="with NO BOM" />
</project>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$/../incremental_training" />
<orderEntry type="jdk" jdkName="Python 3.7 (MK)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">
<option name="requirementsPath" value="" />
</component>
<component name="PyDocumentationSettings">
<option name="analyzeDoctest" value="false" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>

4
.idea/misc.xml 100644
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (MK)" project-jdk-type="Python SDK" />
</project>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/../incremental_training/.idea/incremental_training.iml" filepath="$PROJECT_DIR$/../incremental_training/.idea/incremental_training.iml" />
</modules>
</component>
</project>

739
.idea/workspace.xml 100644
View File

@ -0,0 +1,739 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ChangeListManager">
<list default="true" id="f3f89d3d-fe06-4be5-b478-61463c4bc31a" name="Default Changelist" comment="" />
<option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="FileEditorManager">
<leaf SIDE_TABS_SIZE_LIMIT_KEY="300">
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/initial_model_DAG.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="221">
<caret line="48" column="32" selection-start-line="48" selection-start-column="32" selection-end-line="48" selection-end-column="51" />
<folding>
<element signature="e#1#15#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/stream_DAG.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="167">
<caret line="26" column="44" selection-start-line="26" selection-start-column="44" selection-end-line="26" selection-end-column="69" />
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/update_DAG.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="210">
<caret line="14" column="51" lean-forward="true" selection-start-line="14" selection-start-column="51" selection-end-line="14" selection-end-column="51" />
<folding>
<element signature="e#1#15#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/docker-compose-project.yml">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="210">
<caret line="14" column="23" selection-start-line="14" selection-start-column="23" selection-end-line="14" selection-end-column="23" />
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/src/data/kafka_producer.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="202">
<caret line="23" column="35" selection-start-line="23" selection-start-column="35" selection-end-line="23" selection-end-column="35" />
<folding>
<element signature="e#0#22#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/src/models/initial_model_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="1232">
<caret line="89" column="4" lean-forward="true" selection-start-line="89" selection-start-column="4" selection-end-line="89" selection-end-column="69" />
<folding>
<element signature="e#1#13#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/dags/src/models/update_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="75">
<caret line="5" column="19" lean-forward="true" selection-start-line="5" selection-start-column="19" selection-end-line="5" selection-end-column="19" />
<folding>
<element signature="e#1#13#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/src/__init__.py">
<provider selected="true" editor-type-id="text-editor" />
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/src/preprocessing/preprocessing_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="105">
<caret line="7" column="69" selection-start-line="7" selection-start-column="69" selection-end-line="7" selection-end-column="69" />
</state>
</provider>
</entry>
</file>
<file pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/dags/src/data/data_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-775">
<caret line="9" column="11" selection-start-line="9" selection-start-column="11" selection-end-line="9" selection-end-column="11" />
<folding>
<element signature="e#0#47#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
</leaf>
</component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Python Script" />
</list>
</option>
</component>
<component name="FindInProjectRecents">
<findStrings>
<find>ip</find>
<find>new_data</find>
<find>kwargs['path_new_data']</find>
<find>input_shape</find>
<find>num_</find>
<find>stream</find>
<find>stream_sample</find>
<find>num_classes</find>
<find>mlflow.keras</find>
<find>load_base</find>
</findStrings>
<replaceStrings>
<replace>NUM_CLASSES</replace>
</replaceStrings>
</component>
<component name="IdeDocumentHistory">
<option name="CHANGED_PATHS">
<list>
<option value="$PROJECT_DIR$/kafka_scripts/kafka_consumer.py" />
<option value="$PROJECT_DIR$/src/data/data_functions_minst.py" />
<option value="$PROJECT_DIR$/src/preprocessing/preprocessing_functions.py" />
<option value="$PROJECT_DIR$/src/models/scoring_functions.py" />
<option value="$PROJECT_DIR$/src/models/model_functions.py" />
<option value="$PROJECT_DIR$/src/models/initial_model_functions.py" />
<option value="$PROJECT_DIR$/src/models/retrain_functions.py" />
<option value="$PROJECT_DIR$/src/kafka/kafka_consumer.py" />
<option value="$PROJECT_DIR$/src/kafka/kafka_producer.py" />
<option value="$PROJECT_DIR$/dags/src/kafka/kafka_consumer.py" />
<option value="$PROJECT_DIR$/dags/mlruns/0/meta.yaml" />
<option value="$PROJECT_DIR$/dags/mlruns/0/e802ef6f87354aba99618ee14d7ab4dd/meta.yaml" />
<option value="$PROJECT_DIR$/dags/mlruns/0/5fbdae50dcb149d18cc6e4330df894a0/meta.yaml" />
<option value="$PROJECT_DIR$/airflow_docker/Dockerfile" />
<option value="$PROJECT_DIR$/../../../signature (2).html" />
<option value="$PROJECT_DIR$/mlflow_docker/Dockerfile" />
<option value="$PROJECT_DIR$/dags/src/models/test.py" />
<option value="$PROJECT_DIR$/dags/src/kafka/test.prod.py" />
<option value="$PROJECT_DIR$/dags/src/kafka/test_cons_1.py" />
<option value="$PROJECT_DIR$/dags/src/kafka/test_prod.py" />
<option value="$PROJECT_DIR$/dags/src/kafka/test_cons.py" />
<option value="$PROJECT_DIR$/airflow_docker/requirements.txt" />
<option value="$PROJECT_DIR$/../../../docker_pg/docker-compose_pg.yml" />
<option value="$PROJECT_DIR$/dags/main.py" />
<option value="$PROJECT_DIR$/dags/src/kafka/kafka_producer.py" />
<option value="$PROJECT_DIR$/docker-compose-airflow_postgres.yml" />
<option value="$PROJECT_DIR$/dags/operator_MK.py" />
<option value="$PROJECT_DIR$/dags/retrain_operator.py" />
<option value="$PROJECT_DIR$/dags/initial_model_operator.py" />
<option value="$PROJECT_DIR$/dags/stream_generator.py" />
<option value="$PROJECT_DIR$/Kubernetes/train.py" />
<option value="$PROJECT_DIR$/Kubernetes/requirements.txt" />
<option value="$PROJECT_DIR$/Kubernetes/Dockerfile" />
<option value="$PROJECT_DIR$/Kubernetes/docker-compose-project.yml" />
<option value="$PROJECT_DIR$/Kubernetes/docker-compose.yml" />
<option value="$PROJECT_DIR$/Kubernetes/train_minikube.yml" />
<option value="$PROJECT_DIR$/Kubernetes/deploy_minikube.yaml" />
<option value="$PROJECT_DIR$/Kubernetes/serve/serve.py" />
<option value="$PROJECT_DIR$/Kubernetes/serve/Dockerfile" />
<option value="$PROJECT_DIR$/Kubernetes/serve/deploy_minikube.yaml" />
<option value="$PROJECT_DIR$/dags/initial_model_dag.py" />
<option value="$PROJECT_DIR$/dags/stream_DAG.py" />
<option value="$PROJECT_DIR$/dags/initial_model_DAG.py" />
<option value="$PROJECT_DIR$/docker-compose-project.yml" />
<option value="$PROJECT_DIR$/dags/src/data/data_functions.py" />
<option value="$PROJECT_DIR$/dags/src/data/kafka_producer.py" />
<option value="$PROJECT_DIR$/dags/src/models/initial_model_functions.py" />
<option value="$PROJECT_DIR$/dags/src/models/retrain_functions.py" />
<option value="$PROJECT_DIR$/dags/src/preprocessing/preprocessing_functions.py" />
<option value="$PROJECT_DIR$/dags/update_DAG.py" />
<option value="$PROJECT_DIR$/dags/src/models/update_functions.py" />
</list>
</option>
</component>
<component name="ProjectFrameBounds">
<option name="x" value="2032" />
<option name="y" value="140" />
<option name="width" value="1454" />
<option name="height" value="934" />
</component>
<component name="ProjectView">
<navigator proportions="" version="1">
<foldersAlwaysOnTop value="true" />
</navigator>
<panes>
<pane id="ProjectPane">
<subPane>
<expand>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="airflow_docker" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="dags" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="dags" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="dags" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
<item name="data" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="dags" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
<item name="models" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="dags" type="462c0819:PsiDirectoryNode" />
<item name="src" type="462c0819:PsiDirectoryNode" />
<item name="preprocessing" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="data" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="mlflow_docker" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="models" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="models" type="462c0819:PsiDirectoryNode" />
<item name="archive" type="462c0819:PsiDirectoryNode" />
</path>
<path>
<item name="blog_online_training" type="b2602c69:ProjectViewProjectNode" />
<item name="blog_online_training" type="462c0819:PsiDirectoryNode" />
<item name="models" type="462c0819:PsiDirectoryNode" />
<item name="current_model" type="462c0819:PsiDirectoryNode" />
</path>
</expand>
<select />
</subPane>
</pane>
<pane id="Scope" />
</panes>
</component>
<component name="PropertiesComponent">
<property name="last_opened_file_path" value="$PROJECT_DIR$" />
<property name="settings.editor.selected.configurable" value="com.jetbrains.python.configuration.PyIntegratedToolsModulesConfigurable" />
</component>
<component name="RecentsManager">
<key name="MoveFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/Kubernetes/serve" />
<recent name="$PROJECT_DIR$/Kubernetes/train" />
<recent name="$PROJECT_DIR$/dags/src/data" />
<recent name="$PROJECT_DIR$/dags" />
<recent name="$PROJECT_DIR$" />
</key>
<key name="CopyFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/Kubernetes/serve" />
<recent name="$PROJECT_DIR$/Kubernetes" />
<recent name="$PROJECT_DIR$/dags" />
<recent name="$PROJECT_DIR$/airflow_docker" />
<recent name="$PROJECT_DIR$/dags/src/kafka" />
</key>
</component>
<component name="RunDashboard">
<option name="ruleStates">
<list>
<RuleState>
<option name="name" value="ConfigurationTypeDashboardGroupingRule" />
</RuleState>
<RuleState>
<option name="name" value="StatusDashboardGroupingRule" />
</RuleState>
</list>
</option>
</component>
<component name="RunManager" selected="Python.main">
<configuration default="true" type="PythonConfigurationType" factoryName="Python">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="kafka_producer" type="PythonConfigurationType" factoryName="Python" temporary="true">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/src/kafka" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/src/kafka/kafka_producer.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="main" type="PythonConfigurationType" factoryName="Python" temporary="true">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/dags" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/dags/main.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="test" type="PythonConfigurationType" factoryName="Python" temporary="true">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/dags/src/models" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/dags/src/models/test.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="test_cons" type="PythonConfigurationType" factoryName="Python" temporary="true">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/dags/src/kafka" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/dags/src/kafka/test_cons.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="test_prod" type="PythonConfigurationType" factoryName="Python" temporary="true">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/dags/src/kafka" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/dags/src/kafka/test_prod.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration default="true" type="Tox" factoryName="Tox">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<method v="2" />
</configuration>
<configuration default="true" type="tests" factoryName="Doctests">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="SCRIPT_NAME" value="" />
<option name="CLASS_NAME" value="" />
<option name="METHOD_NAME" value="" />
<option name="FOLDER_NAME" value="" />
<option name="TEST_TYPE" value="TEST_SCRIPT" />
<option name="PATTERN" value="" />
<option name="USE_PATTERN" value="false" />
<method v="2" />
</configuration>
<configuration default="true" type="tests" factoryName="Nosetests">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="_new_regexPattern" value="&quot;&quot;" />
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;&quot;" />
<option name="_new_targetType" value="&quot;PATH&quot;" />
<method v="2" />
</configuration>
<configuration default="true" type="tests" factoryName="Twisted Trial">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;&quot;" />
<option name="_new_targetType" value="&quot;PATH&quot;" />
<method v="2" />
</configuration>
<configuration default="true" type="tests" factoryName="Unittests">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;&quot;" />
<option name="_new_targetType" value="&quot;PATH&quot;" />
<method v="2" />
</configuration>
<configuration default="true" type="tests" factoryName="py.test">
<module name="incremental_training" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<option name="_new_keywords" value="&quot;&quot;" />
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;&quot;" />
<option name="_new_targetType" value="&quot;PATH&quot;" />
<method v="2" />
</configuration>
<recent_temporary>
<list>
<item itemvalue="Python.main" />
<item itemvalue="Python.test_cons" />
<item itemvalue="Python.test_prod" />
<item itemvalue="Python.test" />
<item itemvalue="Python.kafka_producer" />
</list>
</recent_temporary>
</component>
<component name="SvnConfiguration">
<configuration />
</component>
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="f3f89d3d-fe06-4be5-b478-61463c4bc31a" name="Default Changelist" comment="" />
<created>1566206779139</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1566206779139</updated>
</task>
<servers />
</component>
<component name="TodoView">
<todo-panel id="selected-file">
<is-autoscroll-to-source value="true" />
</todo-panel>
<todo-panel id="all">
<are-packages-shown value="true" />
<is-autoscroll-to-source value="true" />
</todo-panel>
</component>
<component name="ToolWindowManager">
<frame x="2032" y="140" width="1454" height="934" extended-state="0" />
<layout>
<window_info content_ui="combo" id="Project" order="0" visible="true" weight="0.25610608" />
<window_info id="Structure" order="1" side_tool="true" weight="0.25" />
<window_info id="Favorites" order="2" side_tool="true" />
<window_info anchor="bottom" id="Message" order="0" />
<window_info anchor="bottom" id="Find" order="1" weight="0.32967034" />
<window_info anchor="bottom" id="Run" order="2" weight="0.32722834" />
<window_info anchor="bottom" id="Debug" order="3" weight="0.39937598" />
<window_info anchor="bottom" id="Cvs" order="4" weight="0.25" />
<window_info anchor="bottom" id="Inspection" order="5" weight="0.4" />
<window_info anchor="bottom" id="TODO" order="6" weight="0.32967034" />
<window_info anchor="bottom" id="Version Control" order="7" />
<window_info active="true" anchor="bottom" id="Terminal" order="8" visible="true" weight="0.5457876" />
<window_info anchor="bottom" id="Event Log" order="9" side_tool="true" />
<window_info anchor="bottom" id="Python Console" order="10" />
<window_info anchor="right" id="Commander" internal_type="SLIDING" order="0" type="SLIDING" weight="0.4" />
<window_info anchor="right" id="Ant Build" order="1" weight="0.25" />
<window_info anchor="right" content_ui="combo" id="Hierarchy" order="2" weight="0.25" />
</layout>
</component>
<component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/kafka/kafka_consumer_NI.py" />
<entry file="file://$PROJECT_DIR$/docker-compose-postgres.yml" />
<entry file="file:///anaconda3/envs/DE/lib/python3.6/site-packages/kafka/client_async.py" />
<entry file="file://$PROJECT_DIR$/src/main.py" />
<entry file="file://$PROJECT_DIR$/src/models/scoring_functions.py" />
<entry file="file://$PROJECT_DIR$/data/__init__.py">
<provider selected="true" editor-type-id="text-editor" />
</entry>
<entry file="file://$PROJECT_DIR$/src/kafka/kafka_transfer.py" />
<entry file="file://$PROJECT_DIR$/docker-compose-kafka.yml" />
<entry file="file://$PROJECT_DIR$/dags/mlruns/0/meta.yaml" />
<entry file="file://$PROJECT_DIR$/dags/mlruns/0/e802ef6f87354aba99618ee14d7ab4dd/meta.yaml" />
<entry file="file://$PROJECT_DIR$/dags/mlruns/0/5fbdae50dcb149d18cc6e4330df894a0/meta.yaml" />
<entry file="file://$PROJECT_DIR$/../../../signature (2).html" />
<entry file="file:///anaconda3/envs/DE/lib/python3.6/site-packages/mlflow/utils/rest_utils.py" />
<entry file="file:///anaconda3/envs/DE/lib/python3.6/site-packages/mlflow/tracking/fluent.py" />
<entry file="file:///anaconda3/envs/DE/lib/python3.6/site-packages/mlflow/store/rest_store.py" />
<entry file="file://$PROJECT_DIR$/dags/src/kafka/test_cons_1.py" />
<entry file="file://$PROJECT_DIR$/../../Data_Engineering/Data_Engineering_Mike/docker-compose-single-broker.yml">
<provider selected="true" editor-type-id="text-editor" />
</entry>
<entry file="file://$PROJECT_DIR$/dags/src/kafka/test_prod.py" />
<entry file="file://$PROJECT_DIR$/../../../docker_pg/docker-compose_pg.yml" />
<entry file="file://$PROJECT_DIR$/dags/main.py" />
<entry file="file://$PROJECT_DIR$/dags/src/models/test.py" />
<entry file="file://$PROJECT_DIR$/dags/src/kafka/test_cons.py" />
<entry file="file://$PROJECT_DIR$/dags/src/data/data_functions_minst.py" />
<entry file="file://$PROJECT_DIR$/dags/src/kafka/kafka_consumer.py" />
<entry file="file://$PROJECT_DIR$/airflow_docker/requirements_test.txt" />
<entry file="file://$PROJECT_DIR$/MNIST.ipynb">
<provider selected="true" editor-type-id="ipnb-editor">
<state>
<selected id="0" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/Kubernetes_Day_2/train/requirements.txt" />
<entry file="file://$PROJECT_DIR$/Kubernetes_Day_2/train/train_minikube.yml" />
<entry file="file://$PROJECT_DIR$/Kubernetes_Day_2/train/train.py" />
<entry file="file://$PROJECT_DIR$/Kubernetes_Day_2/train/Dockerfile" />
<entry file="file://$PROJECT_DIR$/Kubernetes_Day_2/serve/Dockerfile" />
<entry file="file://$PROJECT_DIR$/Kubernetes_Day_2/serve/deploy_minikube.yaml" />
<entry file="file://$PROJECT_DIR$/Kubernetes_Day_2/serve/serve.py" />
<entry file="file://$PROJECT_DIR$/airflow_docker/requirements.txt">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="60">
<caret line="4" column="14" lean-forward="true" selection-start-line="4" selection-start-column="14" selection-end-line="4" selection-end-column="14" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/mlflow_docker/Dockerfile">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="60">
<caret line="4" column="11" lean-forward="true" selection-start-line="4" selection-start-column="11" selection-end-line="4" selection-end-column="11" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/airflow_docker/Dockerfile">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="90">
<caret line="6" lean-forward="true" selection-start-line="6" selection-end-line="6" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/stream_DAG.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="167">
<caret line="26" column="44" selection-start-line="26" selection-start-column="44" selection-end-line="26" selection-end-column="69" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/initial_model_DAG.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="221">
<caret line="48" column="32" selection-start-line="48" selection-start-column="32" selection-end-line="48" selection-end-column="51" />
<folding>
<element signature="e#1#15#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/docker-compose-project.yml">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="210">
<caret line="14" column="23" selection-start-line="14" selection-start-column="23" selection-end-line="14" selection-end-column="23" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/src/__init__.py">
<provider selected="true" editor-type-id="text-editor" />
</entry>
<entry file="file://$PROJECT_DIR$/dags/src/models/initial_model_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="1232">
<caret line="89" column="4" lean-forward="true" selection-start-line="89" selection-start-column="4" selection-end-line="89" selection-end-column="69" />
<folding>
<element signature="e#1#13#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/src/preprocessing/preprocessing_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="105">
<caret line="7" column="69" selection-start-line="7" selection-start-column="69" selection-end-line="7" selection-end-column="69" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/src/data/data_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-775">
<caret line="9" column="11" selection-start-line="9" selection-start-column="11" selection-end-line="9" selection-end-column="11" />
<folding>
<element signature="e#0#47#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/src/data/kafka_producer.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="202">
<caret line="23" column="35" selection-start-line="23" selection-start-column="35" selection-end-line="23" selection-end-column="35" />
<folding>
<element signature="e#0#22#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/update_DAG.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="210">
<caret line="14" column="51" lean-forward="true" selection-start-line="14" selection-start-column="51" selection-end-line="14" selection-end-column="51" />
<folding>
<element signature="e#1#15#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/dags/src/models/update_functions.py">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="75">
<caret line="5" column="19" lean-forward="true" selection-start-line="5" selection-start-column="19" selection-end-line="5" selection-end-column="19" />
<folding>
<element signature="e#1#13#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</component>
</project>

2
README.md 100644
View File

@ -0,0 +1,2 @@
# incremental_training
incremental training with Airflow, Kafka and MLFlow

View File

@ -0,0 +1,6 @@
FROM puckel/docker-airflow:1.10.2
WORKDIR /usr/local/airflow/
COPY requirements.txt .
RUN pip install --user -r requirements.txt

View File

@ -0,0 +1,10 @@
kafka-python==1.4.6
Keras==2.2.5
mlflow==1.2.0
numpy==1.17.1
pandas==0.25.1
scikit-learn==0.21.3
scipy==1.3.1
tensorboard==1.14.0
tensorflow==1.14.0

BIN
dags/.DS_Store vendored 100644

Binary file not shown.

0
dags/__init__.py 100644
View File

View File

@ -0,0 +1,49 @@
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from src.models.initial_model_functions import load_preprocess, fit_model
PATH_STREAM_SAMPLE = "/data/stream_sample.p"
PATH_TEST_SET = "/data/test_set.p"
INITIAL_MODEL_PATH = "/models/current_model/initial_model.H5"
BATCH_SIZE = 128
NUM_CLASSES = 10
EPOCHS = 1
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1), # this in combination with catchup=False ensures the DAG being triggered from the current date onwards along the set interval
'provide_context': True, # this is set to True as we want to pass variables on from one task to another
}
dag = DAG(
dag_id='initial_model_DAG',
default_args=args,
schedule_interval= '@once', # set interval
catchup=False, # indicate whether or not Airflow should do any runs for intervals between the start_date and the current date that haven't been run thus far
)
task1 = PythonOperator(
task_id='load_preprocess',
python_callable=load_preprocess, # function to be executed
op_kwargs={'path_stream_sample': PATH_STREAM_SAMPLE, # input arguments
'path_test_set': PATH_TEST_SET},
dag=dag,
)
task2 = PythonOperator(
task_id='fit_model',
python_callable=fit_model,
op_kwargs={'batch_size': BATCH_SIZE,
'epochs': EPOCHS,
'num_classes': NUM_CLASSES,
'initial_model_path': INITIAL_MODEL_PATH},
dag=dag,
)
task1 >> task2 # set task priority

BIN
dags/src/.DS_Store vendored 100644

Binary file not shown.

View File

BIN
dags/src/data/.DS_Store vendored 100644

Binary file not shown.

View File

@ -0,0 +1,81 @@
from kafka import KafkaConsumer, TopicPartition
from json import loads
import numpy as np
import time
import pickle
import os
import logging
def decode_json(jsons_comb):
x_train = loads(jsons_comb[0])
y_train = loads(jsons_comb[1])
return x_train, y_train
def get_data_from_kafka(**kwargs):
consumer = KafkaConsumer(
kwargs['topic'], # specify topic to consume from
bootstrap_servers=[kwargs['client']],
consumer_timeout_ms=3000, # break connection if the consumer has fetched anything for 3 secs (e.g. in case of an empty topic)
auto_offset_reset='earliest', # automatically reset the offset to the earliest offset (should the current offset be deleted or anything)
enable_auto_commit=True, # offsets are committed automatically by the consumer
#group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
logging.info('Consumer constructed')
try:
xs = []
ys = []
for message in consumer: # loop over messages
logging.info( "Offset: ", message.offset)
message = message.value
x, y = decode_json(message) # decode JSON
xs.append(x)
ys.append(y)
logging.info('Image retrieved from topic')
xs = np.array(xs).reshape(-1, 28, 28, 1) # put Xs in the right shape for our CNN
ys = np.array(ys).reshape(-1) # put ys in the right shape for our CNN
new_samples = [xs, ys]
pickle.dump(new_samples, open(os.getcwd()+kwargs['path_new_data']+str(time.strftime("%Y%m%d_%H%M"))+"_new_samples.p", "wb")) # write data
logging.info(str(xs.shape[0])+' new samples retrieved')
consumer.close()
except Exception as e:
print(e)
logging.info('Error: '+e)
def load_data(**kwargs):
# Load the Kafka-fetched data that is stored in the to_use_for_model_update folder
for file_d in os.listdir(os.getcwd()+kwargs['path_new_data']):
if 'new_samples.p' in file_d:
new_samples = pickle.load(open(os.getcwd()+kwargs['path_new_data'] + file_d, "rb"))
test_set = pickle.load(open(os.getcwd()+kwargs['path_test_set'], "rb"))
logging.info('data loaded')
return [new_samples, test_set]
else:
logging.info('no data found')

View File

@ -0,0 +1,37 @@
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import pickle
import os
import logging
def encode_to_json(x_train, y_train):
x = dumps(x_train.tolist())
y = dumps(y_train.tolist())
jsons_comb = [x, y]
return jsons_comb
def generate_stream(**kwargs):
producer = KafkaProducer(bootstrap_servers=['kafka:9092'], # set up Producer
value_serializer=lambda x: dumps(x).encode('utf-8'))
stream_sample = pickle.load(open(os.getcwd() + kwargs['path_stream_sample'], "rb")) # load stream sample file
rand = random.sample(range(0, 20000), 200) # the stream sample consists of 20000 observations - and along this setup 200 samples are selected randomly
x_new = stream_sample[0]
y_new = stream_sample[1]
logging.info('Partitions: ', producer.partitions_for('TopicA'))
for i in rand:
json_comb = encode_to_json(x_new[i], y_new[i]) # pick observation and encode to JSON
producer.send('TopicA', value=json_comb) # send encoded observation to Kafka topic
logging.info("Sent number: {}".format(y_new[i]))
sleep(1)
producer.close()

View File

@ -0,0 +1,123 @@
import keras
from keras.datasets import mnist
from keras.models import Sequential, load_model
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import pickle
import logging
import os
def load_preprocess(**kwargs):
# load and preprocess MNIST data for initial model fit
img_rows, img_cols = 28, 28
# split data between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
# convert images into the right shape
# source: https://keras.io/examples/mnist_cnn/
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
# normalize data
x_train /= 255
x_test /= 255
# Set train samples apart that will serve as streaming data later on
x_stream = x_train[:20000]
y_stream = y_train[:20000]
x_train = x_train[20000:]
y_train = y_train[20000:]
stream_sample = [x_stream, y_stream]
pickle.dump(stream_sample, open(os.getcwd() + kwargs['path_stream_sample'], "wb"))
# Store test set
test_set = [x_test, y_test]
pickle.dump(test_set, open(os.getcwd() + kwargs['path_test_set'], "wb"))
return x_train, y_train, x_test, y_test, input_shape
def construct_model(num_classes, input_shape):
# construct model framework
# source: https://keras.io/examples/mnist_cnn/
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adadelta(),
metrics=['accuracy'])
return model
def fit_model(**kwargs):
# fit model along preprocessed data and constructed model framework
ti = kwargs['ti']
loaded = ti.xcom_pull(task_ids='load_preprocess')
logging.info('variables successfully fetched from previous task')
x_train = loaded[0]
y_train = loaded[1]
x_test = loaded[2]
y_test = loaded[3]
input_shape = loaded[4]
num_classes = kwargs['num_classes']
# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
# construct & fit
model = construct_model(num_classes, input_shape)
model.fit(x_train, y_train,
batch_size=kwargs['batch_size'],
epochs=kwargs['epochs'],
verbose=1,
validation_data=(x_test, y_test))
# evaluate
score = model.evaluate(x_test, y_test, verbose=0)
logging.info('Test - loss:', score[0])
logging.info('Test - accuracy:', score[1])
model.save(os.getcwd() + kwargs['initial_model_path'])

View File

@ -0,0 +1,114 @@
import keras
from keras.models import Sequential, load_model
import os
import time
import logging
import mlflow.keras
def load_current_model(model_path, file_m):
model = load_model(os.getcwd()+model_path + str(file_m))
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adadelta(),
metrics=['accuracy'])
return model
def update_model(**kwargs):
ti = kwargs['ti']
loaded = ti.xcom_pull(task_ids='preprocessing')
logging.info('variables successfully fetched from previous task')
new_samples = loaded[0]
test_set = loaded[1]
# load new samples
x_new = new_samples[0]
y_new = new_samples[1]
y_new = keras.utils.to_categorical(y_new, kwargs['num_classes'])
# load test set
x_test = test_set[0]
y_test = test_set[1]
y_test = keras.utils.to_categorical(y_test, kwargs['num_classes'])
# get current_model
for file_m in os.listdir(os.getcwd()+kwargs['path_current_model']):
if 'H5' in file_m:
mlflow.set_tracking_uri('http://mlflow:5000')
with mlflow.start_run():
model = load_current_model(kwargs['path_current_model'], file_m)
# get score of current model
current_score = model.evaluate(x_test, y_test, verbose=0)
# update model with new data and evaluate score
model.fit(x_new, y_new,
batch_size=kwargs['batch_size'],
epochs=kwargs['epochs'],
verbose=1,
validation_data=(x_test, y_test))
updated_score = model.evaluate(x_test, y_test, verbose=0)
# log results to MLFlow
mlflow.log_metric('Epochs', kwargs['epochs'])
mlflow.log_metric('Batch size', kwargs['batch_size'])
mlflow.log_metric('test accuracy - current model', current_score[1])
mlflow.log_metric('test accuracy - updated model', updated_score[1])
mlflow.log_metric('loss - current model', current_score[0])
mlflow.log_metric('loss - updated model', updated_score[0])
mlflow.log_metric('Number of new samples used for training', x_new.shape[0])
# if the updated model outperforms the current model -> move current version to archive and promote the updated model
if updated_score[1] - current_score[1] > 0:
logging.info('Updated model stored')
mlflow.set_tag('status', 'the model from this run replaced the current version ')
updated_model_name = 'model_' + str(time.strftime("%Y%m%d_%H%M"))
model.save(os.getcwd()+kwargs['path_current_model'] + updated_model_name + '.H5')
os.rename(os.getcwd()+kwargs['path_current_model']+file_m, os.getcwd()+kwargs['path_model_archive']+file_m)
else:
logging.info('Current model maintained')
mlflow.set_tag('status', 'the model from this run did not replace the current version ')
else:
logging.info(file_m + ' is not a model')
def data_to_archive(**kwargs):
# store data that was used for updating the model in archive along date + time tag
for file_d in os.listdir(os.getcwd()+kwargs['path_new_data']):
if 'new_samples.p' in file_d:
os.rename(os.getcwd()+kwargs['path_new_data'] + file_d, os.getcwd()+kwargs['path_used_data'] + file_d)
logging.info('data used for updating the model has been moved to archive')
else:
print('no data found')

BIN
dags/src/preprocessing/.DS_Store vendored 100644

Binary file not shown.

View File

@ -0,0 +1,18 @@
import logging
def preprocessing(**kwargs):
ti = kwargs['ti']
loaded = ti.xcom_pull(task_ids='load_data')
logging.info('variables successfully fetched from previous task')
new_samples = loaded[0]
test_set = loaded[1]
# Once we have loaded the new data we could do some preprocessing and pass on the preprocessed variables
# Left this section open as no further preprocessing is required
logging.info('preprocessed the data')
return[new_samples, test_set]

32
dags/stream_DAG.py 100755
View File

@ -0,0 +1,32 @@
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from src.data.kafka_producer import generate_stream
PATH_STREAM_SAMPLE = "/data/stream_sample.p"
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1), # this in combination with catchup=False ensures the DAG being triggered from the current date onwards along the set interval
'provide_context': True, # this is set to True as we want to pass variables on from one task to another
}
dag = DAG(
dag_id='stream_DAG',
default_args=args,
schedule_interval= '@hourly', # set interval
catchup=False, # indicate whether or not Airflow should do any runs for intervals between the start_date and the current date that haven't been run thus far
)
task1 = PythonOperator(
task_id='generate_stream',
python_callable=generate_stream, # function to be executed
op_kwargs={'path_stream_sample': PATH_STREAM_SAMPLE}, # input arguments
dag=dag,
)

87
dags/update_DAG.py 100755
View File

@ -0,0 +1,87 @@
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from src.data.data_functions import get_data_from_kafka, load_data
from src.models.update_functions import load_base_model, retrain_model, data_to_archive
from src.preprocessing.preprocessing_functions import preprocessing
CLIENT = 'kafka:9092'
TOPIC = 'TopicA'
PATH_NEW_DATA = '/data/to_use_for_model_update/'
PATH_USED_DATA = '/data/used_for_for_model_update/'
PATH_TEST_SET = '/data/test_set.p'
PATH_INITIAL_MODEL = '/models/initial_model'
PATH_CURRENT_MODEL = '/models/current_model/'
PATH_MODEL_ARCHIVE = '/models/archive/'
BATCH_SIZE = 128
NUM_CLASSES = 10
EPOCHS = 1
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1), # this in combination with catchup=False ensures the DAG being triggered from the current date onwards along the set interval
'provide_context': True, # this is set to True as we want to pass variables on from one task to another
}
dag = DAG(
dag_id='update_DAG',
default_args=args,
schedule_interval='@daily', # set interval
catchup=False, # indicate whether or not Airflow should do any runs for intervals between the start_date and the current date that haven't been run thus far
)
task1 = PythonOperator(
task_id='get_data_from_kafka',
python_callable=get_data_from_kafka, # function called to get data from the Kafka topic and store it
op_kwargs={'path_new_data': PATH_NEW_DATA,
'client': CLIENT,
'topic': TOPIC},
dag=dag,
)
task2 = PythonOperator(
task_id='load_data',
python_callable=load_data, # function called to load data for further processing
op_kwargs={'path_new_data': PATH_NEW_DATA,
'path_test_set': PATH_TEST_SET},
dag=dag,
)
task3 = PythonOperator(
task_id='preprocessing', # function called to preprocess data
python_callable=preprocessing,
op_kwargs={},
dag=dag,
)
task4 = PythonOperator(
task_id='update_model',
python_callable=retrain_model, # function called to update model
op_kwargs = {'num_classes': NUM_CLASSES,
'epochs': EPOCHS,
'batch_size': BATCH_SIZE,
'path_current_model': PATH_CURRENT_MODEL,
'path_model_archive': PATH_MODEL_ARCHIVE,
},
dag=dag,
)
task5 = PythonOperator(
task_id='data_to_archive',
python_callable=data_to_archive, # function called to archive data used for updating the model
op_kwargs = {'path_new_data': PATH_NEW_DATA,
'path_used_data': PATH_USED_DATA,
},
dag=dag,
)
task1 >> task2 >> task3 >> task4 >> task5 # set task priority

BIN
data/.DS_Store vendored 100644

Binary file not shown.

0
data/__init__.py 100644
View File

Binary file not shown.

BIN
data/test_set.p 100644

Binary file not shown.

View File

@ -0,0 +1,59 @@
version: '3.7'
services:
postgres: # create postgres container
image: postgres:9.6
container_name: postgres_container
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
airflow: # create airflow container
build: './airflow_docker' # construct the container along the Dockerfile in this folder
container_name: airflow_container
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
volumes: # mount the following local folders
- ./dags:/usr/local/airflow/dags
- ./data:/usr/local/airflow/data
- ./models:/usr/local/airflow/models
ports:
- "8080:8080" # expose port
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
zookeeper: # create zookeeper container
image: wurstmeister/zookeeper
container_name: zookeeper_container
ports:
- "2181:2181" # expose port
kafka: # create an instance of a Kafka broker in a container
image: wurstmeister/kafka
container_name: kafka_container
ports:
- "9092:9092" # expose port
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka # specify the docker host IP at which other containers can reach the broker
KAFKA_CREATE_TOPICS: "TopicA:1:1" # create a topic called 'TopicA" with 1 partition and 1 replica
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # specify where the broker can reach Zookeeper
KAFKA_LISTENERS: PLAINTEXT://:9092 # the list of addresses on which the Kafka broker will listen on for incoming connections.
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 # Kafka sends the value of this variable to clients during their connection. After receiving that value, the clients use it for sending/consuming records to/from the Kafka broker.y connect to it.
volumes:
- /var/run/docker.sock:/var/run/docker.sock
mlflow: # create a MLFlow container
build: './mlflow_docker' # construct the container along the Dockerfile in this folder
container_name: mlflow_container
ports:
- "5000:5000" # expose port
command: 'mlflow server --backend-store-uri ./mlflow --host 0.0.0.0 --port 5000'

View File

@ -0,0 +1,5 @@
FROM python:3.7.0
RUN pip install mlflow==1.2.0
EXPOSE 5000

BIN
models/.DS_Store vendored 100644

Binary file not shown.

View File

BIN
models/archive/.DS_Store vendored 100644

Binary file not shown.

Binary file not shown.

BIN
models/current_model/.DS_Store vendored 100644

Binary file not shown.

Binary file not shown.