Show and Tell - multi-rq - do multiprocessing from gunicorn with RQ when deploying Dash

I have some callback functions that take forever and I was already using RQ. as a task queue, but some things were taking too long due to processing power and you can’t call a multiprocessing.Pool.apply_async from gunicorn using eventlets…so I made multi-rq to do the same thing but with RQ.

It’s a little different, in that you don’t create a list of objects and then call .get() on them, and unlike RQ you don’t have to manually do the checking to wait until it’s finished. Instead, you just provide the function to apply and a list of argument iterables. You can optionally provide your own completion-checking and data processing functions.

Basic use

# basic_test.py
import time
def wait(i,j):
    print(i)
    return sum((i,j))
import rq
from multi_rq import MultiRQ
from basic_test import wait

mrq = MultiRQ()
nums = [(i,j) for i,j in zip(range(10),range(10))] 
mrq.apply_async(wait, nums, mode='results')
# >>> [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# mode 'results' means you get the results of the jobs; 'jobs' means you get the jobs themselves.

Use with Dash
Example: Doing a big processing task asynchronously in chunks of 5000

def df_process_func(df, my_arg):
    # do something with a df
    return df

@app.callback(...)
def do_this(...):
    mrq = MultiRQ()
    df = pd.read_csv('large_data.csv')
    df = [ (df.iloc[i:i+500], 'arg') for i in range(0,df.shape[0],5000) ]

    # a list of the outputs of df_process_func, 5000 at a time
    output = mrq.apply_async(df_process_func,df)
    
    # put them all back together, for example
    df = pd.concat(output)

This is a little more about web development than about Dash, but it’s an issue I ran into when trying to learn how to deploy Dash. I welcome feedback, issues, and pull requests. :rocket:

3 Likes

Update:

Release v0.2.1

Basically this just implements kwargs and makes args and kwargs positional and optional.

Kwargs can be:

  • a single dict to be applied to each job (can be empty)
  • a list/tuple of length N unique dicts to be applied to each job respectively

Args can be:

  • an iterator of length 0 (default)
  • an iterator of length 1, with the single arg to be applied to all jobs
  • an iterator of length N of unique args iterators to be applied to each job respectively

Type checking is built in so you don’t get hanging processes.

import rq
from multi_rq import MultiRQ

from basic_test import wait
# import time
# def wait(i,j):
#    time.sleep(.1)
#    return sum((i,j))

mrq = MultiRQ()
nums = [(i,j) for i,j in zip(range(0,20,2),range(11,21)]

# any of:
mrq.apply_async(wait,args=nums)
mrq.apply_async(wait,nums)
mrq.apply_async(wait,kwargs=[ {'i':x[0],'j':x[1]} for x in nums])
>>> [11, 14, 17, 20, 23, 26, 29, 32, 35, 38]

This makes multi-rq way more useful for me - I hope it does for you as well.