All articles

Asynchronous Machine Learning in Flask

June 29, 2020

Training a machine learning model can be a very complicated and time-consuming task. In our fast-paced modern world, clients highly value work completed in a fast and professional manner. One of the ways to achieve that when you need to train a machine learning model is by using an asynchronous programming approach.

Asynchronous programming is a type of parallel programming, in which a unit of work can run separately from the primary application thread. When the work is complete, it notifies the main thread about completion or failure of the worker thread. Implementing this approach on a project brings many benefits, such as improved application performance and enhanced responsiveness. Most importantly, however, it significantly cuts down the time spent on training machine learning models. In particular, machine learning models can be trained simultaneously in the background, allowing you to use that time for other tasks.

This article will demonstrate how to implement an asynchronous programming approach on a project that requires training machine learning models. We will do that with the help of a Flask framework together with the Celery task scheduler and the SQS queue from Amazon Web Services (AWS).

The “What and How” of Our Solution

Before we move any further, let’s uncrack the terminology. Flask is a web framework, which provides tools, libraries and technologies necessary to build a web application. Celery task scheduler allows performing complex tasks in the background for the flask app. In our case, it will be used with Amazon SQS, which is a message queue to send the necessary data to another process (workers) that will run the task in the background. However, there are many other message queues, which can be used for this purpose, such as Kafka, RabitMQ, SQS.

When building a Flask application that uses Celery to manage tasks asynchronously, you need to understand that there are three parts to consider besides the queue and the results of the internal processing. In particular, you need to consider a:

  • Flask instance, which is your web or micro-service front-end;
  • Celery instance, which feeds tasks to the queue;
  • Celery worker, which pulls tasks off the queue and completes the work.

The Flask and Celery instances are deployed together and work in tandem at the interface of the application. The Celery worker is deployed separately and operates independently.

Setting Up the Three Components of the Flask and Celery Services

To avoid circular imports, we specify factories.celery_instance and tasks.data_tasks to import the Celery instance placeholder. The placeholder will get its configuration from the instance factories at runtime.

To implement our solution, we will create a project similar in structure to the one below:

Application.py 

worker.py 
celery_holder.py 
factories/ 
                flask_instance.py	 
                celery_instance.py	 
blueprints/ 
                 controller.py	 
tasks/ 
                tasks.py	 

Here `application.py` is our application that initializes the Flask instance and Celery instance to provide the part of the web/micro-service that we interact with. The Celery placeholder is provided in `celery_holder.py`, and the Celery worker is initialized through `worker.py`. The application factories are provided in `factories/`, the Flask blueprint in `blueprints/`, and the Celery tasks in `tasks/`. Now, the import pattern will look something like this:

To help you better understand, how to implement such an approach, we provide some examples of the code below. Starting from the bottom of the import pattern, we would like to first show the code for the Celery placeholder. This placeholder gives the task a file. The Celery factory file is a Celery instance that can be imported and referenced in decorators, as well as when extending the Celery class.

from celery import Celery 
 
celery = Celery(__name__, include=['tasks.tasks']) 

Here we demonstrate a Celery task with training a machine learning model.

import time 
from celery_holder import celery 

from sklearn.linear_model import LogisticRegression 
 
 
@celery.task 
def celery_long_task(dataset, target): 

logisticRegr = LogisticRegression().fit(dataset, target) 
	return logisticRegr.score(dataset, target)

That task is called within an endpoint defined in a Flask blueprint.

from flask import Blueprint 
 
from tasks.long_tasks import celery_long_task 

import pandas as pd 
 
 
sqs = Blueprint('sqs', __name__) 
 
 
@sqs.route('/celery/', methods=['GET']) 
def add_celery_task(): 

dataset, target  = pd.read_csv(DATASET_URL), pd.read_csv(TARGET_URL) 
 	celery_long_task.delay(dataset, target) 
 	return 'Dataset and target queued' 

Then we go to the factory that creates a Flask instance with all the necessary configuration specified.

from flask import Flask 
import os 
import logging 
 
from blueprints.controller import sqs 
 
 
def create_app() -> Flask: 
 	app = Flask(__name__) 
 
		 
 
 	# Register blueprints 
 	app.register_blueprint(sqs, url_prefix='/sqs') 
 
 	return app 

After that we describe a Celery holder.

import celery_holder 
from flask import Flask 
from celery import Celery 

def configure_celery(app: Flask) -> Celery: 
    """Configure celery instance using config from Flask app""" 
    	TaskBase = celery_holder.celery.Task     

class 			C	СontextTask(TaskBase): 
        		abstract = True 
        		def __call__(self, *args, **kwargs): 
            			with app.app_context(): 
                				return TaskBase.__call__(self, *args, **kwargs)     

celery_holder.celery.conf.update(app.config) 
    celery_holder.celery.Task = ContextTask 
     
   return celery_holder.celery 

Finally, we have reached two files that are called to launch our application:

from factories.flask_instance import create_app 
from factories.celery_instance import configure_celery 
 
# Imported for type hinting 
from flask import Flask 
from celery import Celery 
 
 
def create_full_app() -> Flask: 
            app	: Flask = create_app() 
            cel_app	: Celery = configure_celery(app) 
 	return app 

As well as launch our worker:

from factories.flask_instance import create_app 
from factories.celery_instance import configure_celery 
 
# Imported for type hinting 
from flask import Flask 
from celery import Celery 
 
app: Flask = create_app() 
celery: Celery = configure_celery(app) 

To run the Flask application: set FLASK_APP=application:create_full_app().

To run the Celery worker point to worker.celery in the –app=* parameter of the “celery worker” command.

How to Configure These Components

To configure and implement SQS as a background task messaging queue Celery requires an absolute minimum of steps. Really, all you need to do for Celery to recognize SQS as a queue is to specify the correct BROKER_URL and pass the queue name to CELERY_DEFAULT_QUEUE.

SQS_NAME = 'test'# Celery configuration 
BROKER_URL = 'sqs://' 
BROKER_TRANSPORT_OPTIONS = { 
     'region': 'us-east-1', 
     'polling_interval': 5,  # number of sec to sleep between polls 
     'wait_time_seconds': 5 
} 
CELERY_DEFAULT_QUEUE = SQS_NAME 
CELERY_ENABLE_REMOTE_CONTROL = False 
CELERY_SEND_EVENTS = False 

These are the main concepts behind the proposed architecture, which you need to know to implement such an approach. Asynchronous machine learning can be convenient on big analytical projects for such industries as stock markets, gambling and others. Yet the benefits of it are not limited to only these industries. Many projects that aim to cut down time and optimize processes, while implementing a machine learning model, can thrive with the help of such a solution.

References

Overview of Celery with Flask: https://blog.miguelgrinberg.com/post/using-celery-with-flask

by Nikita Lazaryonok

;

Nikita is a Data Scientist with 2+ year experience. He has an increased interest in classical machine learning algorithms and data analysis.