Show and tell: Super-fast way to share data between callbacks: Apache Plasma in-memory object store

I’ve been following the Arrow project and have messed around a bit with Apache Plasma as a shared-memory data backend. Has anyone else used this? It provides incredible performance boosts compared to reading large data from disk, caching large objects, or other things. I was reading things in for each callback with Parquet (which is already fast) and experienced write speedups of about 5x and read speedups of 10-50x because though there is serialization, it’s all in memory.

Docs: https://arrow.apache.org/docs/python/plasma.html#

Some of my Dash projects involve

  1. users interact in the data in resource-intensive ways
  2. data sizes are massive: between 10M and 200M rows and between 4 and 165 columns; even as_type('category') won’t save me as about 75% of the columns are numerical or unique strings
  3. user experience needs to be reasonably fast
  4. there are multiple data sources that change (e.g. I want all the marketing responses for company A, then in 1 minute I want the marketing responses for company B)
  5. I can only hold one or two of these large data objects in memory at once, preferably one
  6. *classic Dash thing* must share data between many callbacks
  7. some callbacks change parts of the shared data object
  8. I don’t have enough bandwidth to manage a lot of IT hardware/software

So, because we can’t use global variables :wink: So this means that I need a super fast, relatively simple way to read large new objects into memory and then share them between functions.


Out with the (excellent) old - Parquet

Each callback would look something like this:

...
@app.callback(...)
def data_operation():
    # takes about five-twenty seconds for tens of millions of rows, slower with more columns - Parquet is FAST
    df = pd.read_parquet("path/to/file.parq",columns=['col1','col2'])
    #do the operations - takes five-ten seconds with optimizations and parallel processing
    return the_output

Overall time: 10-45s

This was/is fine…but from a user experience perspective, it’s just not enough. Users want to see things change when they click buttons.

Enter the new - Plasma in-memory object store.

Read this for detail.

https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/

This is based off/part of Apache Arrow. It’s kind of like Redis but can be used with very large objects. For small objects I still just use JSON or Pickle files on the disk/network.

From their docs (above) on Pandas integration - basic python objects are much easier, just object_id = client.put(thing) and client.get(object_id), my code is just a function for writing and a function for reading, plus a Pickle-d dictionary index on disk that keeps track of object IDs.

# Start the plasma_store with up to 15GB of memory (it open up memory as needed for new things)
$ plasma_store -m 15000000000 -s /tmp/plasma
# /functions.py
# each time the data source changes, say, from company A to company B, I write the new data to Plasma if it's not already there
def write_to_plasma(thing,which='name_of_thing'):
    # get the client
    client = plasma.connect('/tmp/plasma')

    #save the thing to Plasma
    record_batch = pa.RecordBatch.from_pandas(thing)
    object_id = plasma.ObjectID(np.random.bytes(20))
    mock_sink = pa.MockOutputStream()
    stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
    stream_writer.write_batch(record_batch)
    stream_writer.close()
    data_size = mock_sink.size()
    buf = client.create(object_id, data_size)
    stream = pa.FixedSizeBufferWriter(buf)
    stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
    stream_writer.write_batch(record_batch)
    stream_writer.close()
    client.seal(object_id)

    # end the client
    client.disconnect()

    # write the new object ID
    plasma_state = pickle.load(open('.../plasma_state.pkl','rb'))
    plasma_state[which] = object_id
    pickle.dump(plasma_state,open('.../plasma_state.pkl','wb'))

# reads the data each time
def read_from_plasma(which):
    # get the current plasma_state
    plasma_state = pickle.load(open('.../plasma_state.pkl','rb'))

    # get the object ID for the thing you want
    object_id = plasma_state[which]

    # get the client and read it
    client = plasma.connect('/tmp/plasma')

    # read it
    [data] = client.get_buffers([object_id])
    buffer = pa.BufferReader(data)
    reader = pa.RecordBatchStreamReader(buffer)
    record_batch = reader.read_next_batch()
    results = record_batch.to_pandas()    

    # close out and finish
    client.disconnect()

    return results
# /app.py
@app.callback(...)
def do_something(...):
    # takes about 200ms - 1200 ms
    df = read_from_plasma('company_A_data') # e.g.
    # do the operations, no time savings here; five to ten seconds with optimizations
    return the_output

Overall time: six to twelve seconds :rocket:


Anyways, I hope that some of you find this info useful - it’s certainly helped me speed up my massive-data Dash apps and increase the amount of functionality I can give to users. The only limitations I see so far are that the API is not stable and use is limited to Arrow-compatible objects. But given that Pandas, Numpy, and almost all common Python objects are compatible, I’ve found it perfect for my uses.

8 Likes

Thanks for this example. I also work with very large data sets and build applications that rely heavily on user input. So, I am always on the lookout for more speed and will definitely try Plasma on my next project. (I currently use Redis and serialize the data with Arrow.)

One thing that is slightly confusing to me is object serialization. Looking at your write-up and at the Plasma page, it reads as if no serialization/deserialization is required to store/extract a Pandas dataframe from Plasma. Is this true?

1 Like

You do need to convert the dataframe into a pyarrow.RecordBatch in memory in this example, as in

def write_to_plasma(...):
    ...
    record_batch = pa.RecordBatch.from_pandas(thing)
    ...

See docs:

2 Likes

Very helpful. Thanks

If you’re here, see my new post on this - I’ve built a module, brain-plasma, with a basic API for a shared-memory object namespace for Python objects on top of the Plasma API - it simplifies this process significantly.