(DRAFT) Scaling a django project using celery and elasticsearch

Scalability has undoubtedly become an important asset not only for large software projects but also mid-level ones now. Indeed, when you have a look at developer job posts, you can see easily that the need to build scalable software is one of the top desired skills.

Making a software scalable requires mostly more than one methodology and different technology stacks. At first, it may look difficult and require deep experience with those methods and stacks. And yes, scalability is a serious topic. Although it is, in this post, I will show how to make a Django application scalable. In the end, you will learn fundamental concepts and approaches.

PLAN

  1. create a django project which is a little product management system.
  2. make adding product asynchronous instead of synchronous
  3. sync elasticsearch with the primary database (SQLite)
  4. use elasticsearch for autocomplete and real-time analysis

1. Product management app with Django

Since I assume you are familiar with Python and Django, I am not going into every detail of making a Django app, instead, I will give you necessary operations specific to this tutorial.

Start a Django project and an app named `products`.

First, define a product model as follows:
# products/models.py
class Product(models.Model):
    STATES = [
        (1, "active"),
        (0, "inactive"),
        (2, "processing")
    ]
    status = models.SmallIntegerField(choices=STATES, default=STATES[2][0])
    title = models.CharField(max_length=100, blank=False)
    image_url = models.CharField(max_length=255, blank=False)

..and corresponding form:
# products/forms.py
class ProductForm(forms.ModelForm):
    class Meta:
        model = Product
        fields = ["title", "image_url"]

Define the following view for your product:
# products/views.py
def do_other_slow_work(product):
    image_url = product.image_url
    # fetch,
    # process,
    # create some thumbnails,
    # generate caches
    # ... and other arbitrary heavy operations.

    # let's say they take 5 seconds on average
    sleep(5)

    return "slow work is done."


@csrf_exempt
def add_product(request):
    if request.method != "POST":
        return HttpResponse("only adding new products is allowed")

    start = time()
    form = ProductForm(request.POST)
    if form.is_valid():
        product = form.save()
        # do some slow, resource heavy task here.
        do_other_slow_work(product)

    end = time()
    took = end - start
    return HttpResponse(
        "all things are done. {0:.3f} seconds took. ".format(took))

and add this view into Django's router:
# products/urls.py
urlpatterns = [
    url(r"", views.add_product)
]
# mysite/urls.py
urlpatterns = [
    url(r"^products/", include("products.urls")),
    url(r'^admin/', admin.site.urls),
]
We are done for now with our very basic Django app that accepts post requests including a valid production title and image_url. Let's test it via CURL.
(venv)➜  mysite curl -XPOST 0.0.0.0:8000/products/ -d "title=p2&image_url=4"
all things are done. 5.006 seconds took.
(venv)➜  mysite
As you can see, our application does all the operations in real-time, in a blocking way. Thos duration may vary from app to app but we will be bothered anyway even if it is 2 seconds, we would like not to wait for the application, want it to be responsive instead. This leads us to devise a way to make all the slow, resource-heavy operations in an asynchronous manner. Let the client get the response immediately, saying its request is valid but will be processed later. At this point, Celery comes to help make operations asynchronously, maybe scheduled if desired.

2. evolving to asynchronous operations by using Celery

First, you need to define the Celery client. Celery needs a message broker, and most used ones are Redis or Rabbitmq. We will use Redis for the sake of simplicity. However Redis may not be a good choice in production, so please read brokers' pros and cons before using them. I will not go deeper into how Celery works or how to optimize it. Please refer for more information.
from __future__ import absolute_import
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')

from django.conf import settings  # noqa

app = Celery('mysite')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
add the following imports into __init__.py to make the celery app instance defined above available through the project:
# mysite/__init__.py
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa
and lastly add this celery directives into settings file:
# mysite/settings.py
# CELERY SETTINGS
BROKER_URL = "redis://0.0.0.0/0"
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_ENABLE_UTC = True
now our celery app instance is ready and we can define asynchronous operations.

Let's first make `do_other_slow_work` function asynchronous. To do so, we just decorate our function with Celery's decorators and invoke this function as `do_other_slow_work.delay(...)` instead of `do_other_slow_work(...)`, and we are done.
# products/views.py

@shared_task
def do_other_slow_work(serialized_product):
    product = serializers.deserialize("json", serialized_product)
    product = list(product)[0].object

    image_url = product.image_url
    # fetch,
    # process,
    # create some thumbnails,
    # generate caches
    # ... and other arbitrary heavy operations.

    # let's say they take 5 seconds on average
    sleep(5)

    return "slow work is done."


@csrf_exempt
def add_product(request):
    if request.method != "POST":
        return HttpResponse("only adding new products is allowed")

    start = time()
    form = ProductForm(request.POST)
    if form.is_valid():
        product = form.save()
        # do some slow, resource heavy task here.
        serialized_product = serializers.serialize('json', [product])
        do_other_slow_work.delay(serialized_product)

    end = time()
    took = end - start
    return HttpResponse(
        "all things are done. {0:.3f} seconds took. ".format(took))
start celery workers that will do the actual works by running the tasks:
(venv)➜  mysite celery -A mysite worker -l info
and send a request to test the system:
(venv)➜  mysite: curl -XPOST 0.0.0.0:8000/products/ -d "title=p2&image_url=4"
all things are done. 0.092 seconds took.
(venv)➜  mysite: 
As you can see, we did not have to wait for 5 seconds to get a response. `do_other_slow_work.delay(...)` invocation put a task to be run later into celery's queue. Celery's workers watch the queues and when a new task is available in a queue, they run it. It is possible to tune Celery, such as how many Celery workers will be run in parallel. We may even want to run some specific tasks on a remote machine, or give priority to some tasks, for instance.
At this point, we have had a big step to scale adding product operation. Under a heavy load of adding product operations, we can increase the number of celery workers running in parallel.

A little bit more scalability

You may notice that inserting product into the database is still a blockage which can be regarded as infinitesimal for now. However, since database transactions is an IO operations, they will slow down the application under too many requests. In theory, it is a good idea to run all IO operations(network, database, file etc.) asynchronously; but in practice, do not forget Knut's famous statement:
We should forget about small efficiencies, say about 97% of the time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3% -- Donald Knut
To hand over our new product insertion operation to Celery's worker, we will do more or less same modifications as follows: we move the code segment into a separate function, decorate this function, and call this function like `f.delay()`
# products/views.py

@shared_task()
def do_other_slow_work(serialized_product):
    product = serializers.deserialize("json", serialized_product)
    product = list(product)[0].object

    image_url = product.image_url
    # fetch,
    # process,
    # create some thumbnails,
    # generate caches
    # ... and other arbitrary heavy operations.

    # let's say they take 5 seconds on average
    sleep(5)

    return "slow work is done."


@shared_task()
def save_product(request_data):
    form = ProductForm(request_data)
    product = form.save()
    # do some slow, resource heavy task here.
    serialized_product = serializers.serialize('json', [product])
    do_other_slow_work.delay(serialized_product)


@csrf_exempt
def add_product(request):
    if request.method != "POST":
        return HttpResponse("only adding new products is allowed")

    start = time()
    form = ProductForm(request.POST)
    if form.is_valid():
        save_product.delay(request.POST)

    end = time()
    took = end - start
    return HttpResponse(
        "all things are done. {0:.3f} seconds took. ".format(took))
So far, we have put potentially slow code segments into task functions and run them with celery workers that can work in parallel on different machines. That is, we have the ability to scale product insertion process.

3. sync elasticsearch with the primary database (SQLite)

preparing...

4. use elasticsearch for autocomplete and real-time analysis

preparing...

Conclusions and take aways

preparing...

Comments

Popular posts from this blog

Migrating from PHP to Python

Memory organization and cache management

Predicting game results from fans’ emotions