Skip to content

Instantly share code, notes, and snippets.

@thomas-riccardi
Last active October 17, 2018 12:19
Show Gist options
  • Save thomas-riccardi/6295ca2b39e16619141d349e7b189d75 to your computer and use it in GitHub Desktop.
Save thomas-riccardi/6295ca2b39e16619141d349e7b189d75 to your computer and use it in GitHub Desktop.
Celery duplicate job execution related to `WorkerLostError: Worker exited prematurely: exitcode 155` on worker recycling

Duplicate job execution related to WorkerLostError: Worker exited prematurely: exitcode 155 on worker recycling.

Scenario

# start everything
docker-compose up --build; docker-compose down

# in a second terminal: watch `celery` output
while sleep 1; do
  echo ""
  log=$(docker-compose logs worker)
  echo "MainProcess receved task count uniqueness:"
  echo "${log}" | grep "Received task:" | awk '{print $8}' | sort | uniq -c | sort -g | awk '{print $1}' | uniq -c
  echo -n "WorkerLostError: "
  echo "${log}" | grep "raised error: WorkerLostError" | wc -l
  echo "Job execution uniqueness (job count per job execution count):"
  echo "${log}" | grep do_work: | awk '{print $6}' | sort | uniq -c | sort -g | awk '{print $1}' | uniq -c
done

Actual Result

MainProcess receved task count uniqueness:
   1649 1
WorkerLostError: 125
Job execution uniqueness (job count per job execution count):
   1523 1
     11 2
  • The MainProcess received 1649 tasks, all unique (so it's not a broker-level issue).
  • The WorkerLostError exception is logged 125 times.
  • There are 11 jobs that were executed 2 times.
import os
from tasks import app, work
from celery.backends.database import SessionManager
# to init db backend schema
session = SessionManager()
engine = session.get_engine(app.backend.url)
session.prepare_models(engine)
count = int(os.getenv('CLIENT_LOOP', 1000))
print('Loop creating work', count)
for i in range(0, count):
print(i)
work.delay(i)
print('Finished creating work', count)
version: '2.3'
services:
db:
image: postgres:9.6
rabbitmq:
image: "rabbitmq:3.7-management"
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=password
worker:
build: .
depends_on: [db, rabbitmq]
environment:
- CELERY_BROKER_URL=amqp://guest:password@rabbitmq:5672/
- CELERY_RESULT_BACKEND=db+postgres://postgres:nopassword@db:5432/postgres
command: ./wait-for-it.sh db:5432 -- ./wait-for-it.sh rabbitmq:5672 -- celery -A tasks worker --task-events --concurrency=64 --loglevel=DEBUG --max-tasks-per-child=1
client:
build: .
depends_on: [db, rabbitmq]
environment:
- CELERY_BROKER_URL=amqp://guest:password@rabbitmq:5672/
- CELERY_RESULT_BACKEND=db+postgres://postgres:nopassword@db:5432/postgres
- CLIENT_LOOP=10000
command: ./wait-for-it.sh db:5432 -- ./wait-for-it.sh rabbitmq:5672 -- python3 ./client.py
FROM ubuntu:18.04
RUN apt-get update &&\
apt-get install -y --no-install-recommends \
git \
wget \
python3 python3-pip python3-setuptools python3-wheel &&\
rm -rf /var/lib/apt/lists/* || :
WORKDIR /app
COPY requirements.txt .
RUN pip3 install -r requirements.txt --no-cache-dir
RUN wget https://github.com/vishnubob/wait-for-it/raw/master/wait-for-it.sh && chmod a+x wait-for-it.sh
COPY . .
#celery[sqlalchemy]==4.2.1
git+https://github.com/celery/celery.git@21baef5#egg=celery [sqlalchemy]
kombu==4.2.1
billiard==3.5.0.4
psycopg2-binary==2.7.4
from celery import Celery
app = Celery()
@app.task(name='work')
def work(i):
print('do_work:{}'.format(i))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment