source: main/trunk/openPLM/plmapp/tasks.py @ 471

Revision 471, 2.1 KB checked in by pcosquer, 9 years ago (diff)

csv: work on #46
this commit enables mails sending/index updating after the database commit
a new task (update_indexes) is available to update several indexes at once

Line 
1###########################
2# adapted from https://github.com/mixcloud/django-celery-haystack-SearchIndex/
3# by sdcooke
4
5from functools import wraps
6
7from django.db.models.loading import get_model
8
9from haystack import site
10
11from celery.task import task
12
13def synchronized(cls):
14    """Class decorator to synchronize execution of a task's run method.
15
16    This prevents parallel execution of two instances of the same task within
17    the same worker. If an instance of the same task is running in the same
18    worker, the second invocation calls :meth:`~celery.task.base.Task.retry`
19    is called instead of running the task.
20
21    Note that this applies to the task class, so `@synchronized` should
22    appear before `@task` or `@periodic_task` when tasks are defined with
23    decorators.
24
25    .. code-block:: python
26
27        @synchronized
28        @task
29        def cleanup_database(**kwargs):
30            logger = cleanup_database.get_logger(**kwargs)
31            logger.warn("Task running...")
32    """
33    from multiprocessing import Lock
34    cls.lock = Lock()
35    cls.unsynchronized_run = cls.run
36    @wraps(cls.unsynchronized_run)
37    def wrapper(*args, **kwargs):
38        if cls.lock.acquire(False):
39            try:
40                cls.unsynchronized_run(*args, **kwargs)
41            finally:
42                cls.lock.release()
43        else:
44            cls.retry(args=args, kwargs=kwargs)
45    cls.run = wrapper
46    return cls
47
48@synchronized
49@task(default_retry_delay = 5 * 60, max_retries = 1)
50def update_index(app_name, model_name, pk, **kwargs):
51    model_class = get_model(app_name, model_name)
52    instance = model_class.objects.get(pk=pk)
53    search_index = site.get_index(model_class)
54    search_index.update_object(instance)
55
56@synchronized
57@task(default_retry_delay = 5 * 60, max_retries = 1)
58def update_indexes(instances):
59    for app_name, model_name, pk in instances:
60        model_class = get_model(app_name, model_name)
61        instance = model_class.objects.get(pk=pk)
62        search_index = site.get_index(model_class)
63        search_index.update_object(instance)
64
65@task
66def add(a, b):
67    u"""Simple task, to test the queue ;-)"""
68    return a + b
69
Note: See TracBrowser for help on using the repository browser.