Show and tell: Super-fast way to share data between callbacks: Apache Plasma in-memory object store

#1

I’ve been following the Arrow project and have messed around a bit with Apache Plasma as a shared-memory data backend. Has anyone else used this? It provides incredible performance boosts compared to reading large data from disk, caching large objects, or other things. I was reading things in for each callback with Parquet (which is already fast) and experienced write speedups of about 5x and read speedups of 10-50x because though there is serialization, it’s all in memory.

Docs: https://arrow.apache.org/docs/python/plasma.html#

Some of my Dash projects involve

  1. users interact in the data in resource-intensive ways
  2. data sizes are massive: between 10M and 200M rows and between 4 and 165 columns; even as_type('category') won’t save me as about 75% of the columns are numerical or unique strings
  3. user experience needs to be reasonably fast
  4. there are multiple data sources that change (e.g. I want all the marketing responses for company A, then in 1 minute I want the marketing responses for company B)
  5. I can only hold one or two of these large data objects in memory at once, preferably one
  6. *classic Dash thing* must share data between many callbacks
  7. some callbacks change parts of the shared data object
  8. I don’t have enough bandwidth to manage a lot of IT hardware/software

So, because we can’t use global variables :wink: So this means that I need a super fast, relatively simple way to read large new objects into memory and then share them between functions.


Out with the (excellent) old - Parquet

Each callback would look something like this:

...
@app.callback(...)
def data_operation():
    # takes about five-twenty seconds for tens of millions of rows, slower with more columns - Parquet is FAST
    df = pd.read_parquet("path/to/file.parq",columns=['col1','col2'])
    #do the operations - takes five-ten seconds with optimizations and parallel processing
    return the_output

Overall time: 10-45s

This was/is fine…but from a user experience perspective, it’s just not enough. Users want to see things change when they click buttons.

Enter the new - Plasma in-memory object store.

Read this for detail.

https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/

This is based off/part of Apache Arrow. It’s kind of like Redis but can be used with very large objects. For small objects I still just use JSON or Pickle files on the disk/network.

From their docs (above) on Pandas integration - basic python objects are much easier, just object_id = client.put(thing) and client.get(object_id), my code is just a function for writing and a function for reading, plus a Pickle-d dictionary index on disk that keeps track of object IDs.

# Start the plasma_store with up to 15GB of memory (it open up memory as needed for new things)
$ plasma_store -m 15000000000 -s /tmp/plasma
# /functions.py
# each time the data source changes, say, from company A to company B, I write the new data to Plasma if it's not already there
def write_to_plasma(thing,which='name_of_thing'):
    # get the client
    client = plasma.connect('/tmp/plasma')

    #save the thing to Plasma
    record_batch = pa.RecordBatch.from_pandas(thing)
    object_id = plasma.ObjectID(np.random.bytes(20))
    mock_sink = pa.MockOutputStream()
    stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
    stream_writer.write_batch(record_batch)
    stream_writer.close()
    data_size = mock_sink.size()
    buf = client.create(object_id, data_size)
    stream = pa.FixedSizeBufferWriter(buf)
    stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
    stream_writer.write_batch(record_batch)
    stream_writer.close()
    client.seal(object_id)

    # end the client
    client.disconnect()

    # write the new object ID
    plasma_state = pickle.load(open('.../plasma_state.pkl','rb'))
    plasma_state[which] = object_id
    pickle.dump(plasma_state,open('.../plasma_state.pkl','wb'))

# reads the data each time
def read_from_plasma(which):
    # get the current plasma_state
    plasma_state = pickle.load(open('.../plasma_state.pkl','rb'))

    # get the object ID for the thing you want
    object_id = plasma_state[which]

    # get the client and read it
    client = plasma.connect('/tmp/plasma')

    # read it
    [data] = client.get_buffers([object_id])
    buffer = pa.BufferReader(data)
    reader = pa.RecordBatchStreamReader(buffer)
    record_batch = reader.read_next_batch()
    results = record_batch.to_pandas()    

    # close out and finish
    client.disconnect()

    return results
# /app.py
@app.callback(...)
def do_something(...):
    # takes about 200ms - 1200 ms
    df = read_from_plasma('company_A_data') # e.g.
    # do the operations, no time savings here; five to ten seconds with optimizations
    return the_output

Overall time: six to twelve seconds :rocket:


Anyways, I hope that some of you find this info useful - it’s certainly helped me speed up my massive-data Dash apps and increase the amount of functionality I can give to users. The only limitations I see so far are that the API is not stable and use is limited to Arrow-compatible objects. But given that Pandas, Numpy, and almost all common Python objects are compatible, I’ve found it perfect for my uses.

3 Likes