Changeset 586 in main


Ignore:
Timestamp:
11/29/11 12:28:16 (9 years ago)
Author:
pcosquer
Message:

tasks: update_index and update_indexes use the same lock

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/openPLM/plmapp/tasks.py

    r471 r586  
    1111from celery.task import task 
    1212 
    13 def synchronized(cls): 
     13def synchronized(cls=None, lock=None): 
    1414    """Class decorator to synchronize execution of a task's run method. 
    1515 
    1616    This prevents parallel execution of two instances of the same task within 
    1717    the same worker. If an instance of the same task is running in the same 
    18     worker, the second invocation calls :meth:`~celery.task.base.Task.retry` 
    19     is called instead of running the task. 
     18    worker, the second invocation blocks until the first one completes. 
    2019 
    2120    Note that this applies to the task class, so `@synchronized` should 
     
    3231    """ 
    3332    from multiprocessing import Lock 
    34     cls.lock = Lock() 
     33    cls.lock = lock or Lock() 
    3534    cls.unsynchronized_run = cls.run 
    3635    @wraps(cls.unsynchronized_run) 
    3736    def wrapper(*args, **kwargs): 
    38         if cls.lock.acquire(False): 
    39             try: 
    40                 cls.unsynchronized_run(*args, **kwargs) 
    41             finally: 
    42                 cls.lock.release() 
    43         else: 
    44             cls.retry(args=args, kwargs=kwargs) 
     37        with cls.lock: 
     38            cls.unsynchronized_run(*args, **kwargs) 
    4539    cls.run = wrapper 
    4640    return cls 
    4741 
    4842@synchronized 
    49 @task(default_retry_delay = 5 * 60, max_retries = 1) 
     43@task(default_retry_delay = 60, max_retries = 10) 
    5044def update_index(app_name, model_name, pk, **kwargs): 
    5145    model_class = get_model(app_name, model_name) 
     
    5448    search_index.update_object(instance) 
    5549 
    56 @synchronized 
    57 @task(default_retry_delay = 5 * 60, max_retries = 1) 
     50@task(default_retry_delay = 60, max_retries = 10) 
    5851def update_indexes(instances): 
    5952    for app_name, model_name, pk in instances: 
     
    6255        search_index = site.get_index(model_class) 
    6356        search_index.update_object(instance) 
     57update_indexes = synchronized(update_indexes, update_index.lock) 
    6458 
    6559@task 
Note: See TracChangeset for help on using the changeset viewer.