Multithreading problem with parallel tasks: streaming data and pass it into dash app

#1

Hi

I have an app whith two modules.

module 1: app.py
module 2: streamer.py

The streamer.py streams tweets from twitter and app.py take some specific numbers from streamer.py and display it in a dash gauge chart. The streamer.py works fine.

The problem is, that I need two sessions in parallel which interacts with each other, but when the streamer.py starts streaming, the app.run_server(debug=True) doesn’t start (run ode after flask application has started). I did not figure it out how to manage that in dash.

I have read that Dash is thread based and therefore parallel streaming of content to the Dash application is not possible.

import twitter_streamer

hash_tag_list = ["trump"]
streamer = twitter_streamer.TwitterStreamer()
streamer.stream_tweets(hash_tag_list)

... SOME CODE...

if __name__ == '__main__':
    app.run_server(debug=True)

–> Result: streamer.py works and show me some tweets, but app.py did not run

This does not work neighter because of threading:

  import twitter_streamer
    ... SOME CODE...

    if __name__ == '__main__':
        hash_tag_list = ["trump"]
        streamer = twitter_streamer.TwitterStreamer()
        streamer.stream_tweets(hash_tag_list)
        app.run_server(debug=True)

–> Result: streamer.py works and show me some tweets, but app.py did not run

Is there really no way to stream data in parallel to the Dash app?

#2

I think what is meant in that post is that starting a Dash app will result in the Flask server blocking the rest of your script, as it’s now listening for requests, not so much that Dash itself is threaded. You can run the Flask server that powers Dash with multiple worker process and/or with multiple threads, however using multiple worker processes is often what you want as Python threads only give you concurrency for I/O bound tasks.

I’d suggest that if you want to combine your Dash app into the same Python program, then you start the server in a new process using Python’s multiprocessing module, and just continue defining the rest of your logic that runs in the parent process. You can see an example of this in the Dash tests:

#3

Thank you @nedned. Multiprocessing is definitely new to me. I am not sure which is the best way to implement that and tried it first with a simple program.

count.py (as main code) calls the app.py (Dash) module or reverse? I would say count.py (as main code) calls the app.py (Dash) module because the main code is in count.py and additionally a separated process (the Dash app) should be established in a second process.

Therefore:

count.py

import app

counter = 0

while True:
    counter += 1
    time.sleep(1)
    print(counter)

app.py

import dash
import dash_table
import dash_core_components as dcc
import dash_html_components as html
import dash_daq as daq
from dash.dependencies import Input, Output

import multiprocessing
import time

app = dash.Dash(__name__)

app.layout = html.Div([

    dcc.Interval(
        id='interval-component',
        interval=1*10000,  # in milliseconds
        n_intervals=0
    ),

    html.Div([
        daq.Gauge(
            id='gauge-chart',
            value=2,
            max=100,
            min=0,
            units="MPH",
        )
    ])
])


@app.callback(
    Output('gauge-chart', 'value'),
    [Input('interval-component', 'n_intervals')]
)
def update_gauge(n_intervals):
    value = 50
    return value


def startServer(self, dash):
    def run():
        dash.scripts.config.serve_locally = True
        dash.run_server(
            port=8050,
            debug=False,
            processes=4,
            threaded=False
        )

    # Run on a separate process so that it doesn't block
    self.server_process = multiprocessing.Process(target=run)
    self.server_process.start()
    time.sleep(0.5)

    # Visit the dash page
    self.driver.get('http://localhost:8050')
    time.sleep(0.5)


startServer(app)

But how to call (or to pass the app) the startServer function? This startServer(app) does not work :slight_smile:

#4

In your startServer function, you’ve kept a lot of the code that was relevant to the testing context, so you can clean it up a bit (also don’t need the self as that is only relevant inside a class definition):

def start_server(app, **kwargs):
    def run():
        app.run_server(**kwargs)

    # Run on a separate process so that it doesn't block
    server_process = multiprocessing.Process(target=run)
    server_process.start()

One way you could organise this is to have start_server defined in a helper module say utils.py, then your main.py could look like this:

from utils import start_app
from app import app

start_server(app)

# then the rest of your twitter logic could go here
#...

You would start the app with:

$ python main.py
1 Like
#5

How do we make this work with gunicorn? Because gunicorn won’t know about our start_server function, right?

#6

If you’re using a wsgi server then it could be worth thinking about splitting out your Dash app and the other app you want to run in parallel into two separate services and have them communicate with something like celery, or have your other app write data to a key value store like redis, which your Dash app listens to. Once you start using gunicorn, squeezing everything into the one parent process could introduce more problems than it’s worth. For example, telling gunicorn to run with multiple worker processes will result in your app spawning a child processes doing the data processing for each worker process.

2 Likes
#7

I’ve tried following code as you recommended. It is just a trivial counter but on multiple processes displayed by Dash app.

main.py

from utils import start_server
from app import app
import time

start_server(app)

counter = 0

while True:
    counter += 1
    time.sleep(1)
    print(counter)

utils.py

import multiprocessing

def start_server(app, **kwargs):
    def run():
        app.run_server(**kwargs)

    # Run on a separate process so that it doesn't block
    server_process = multiprocessing.Process(target=run)
    server_process.start()

app.py

import dash
import dash_table
import dash_core_components as dcc
import dash_html_components as html
import dash_daq as daq
from dash.dependencies import Input, Output

import time

app = dash.Dash(__name__)

app.layout = html.Div([

    dcc.Interval(
        id='interval-component',
        interval=1*10000,  # in milliseconds
        n_intervals=0
    ),

    html.Div([
        daq.Gauge(
            id='gauge-chart',
            value=2,
            max=100,
            min=0,
            units="MPH",
        )
    ])
])


@app.callback(
    Output('gauge-chart', 'value'),
    [Input('interval-component', 'n_intervals')]
)
def update_gauge(n_intervals):
    value = 50
    return value

Problem:
AttributeError: Can't pickle local object 'start_server.<locals>.run'
–> I am on a Win machine and it looks that python multiprocessing is differnet on Win then on Linux or Mac --> RuntimeError on windows trying python multiprocessing

Add if __name__ == "__main__": to main.py does not help:

from utils import start_server
from app import app
import time

start_server(app)

counter = 0

while True:
    counter += 1
    time.sleep(1)
    print(counter)

if __name__ == "__main__":
    pass