Real-time data streaming in Jupyter

Build dashboards for live data streams in Jupyter Notebooks

Avril Aysha
Python in Plain English

--

Photo by Chris Liverani on Unsplash

Jupyter notebooks for streaming data

Jupyter notebooks are a great tool for working with data. They give you a friendly interactive interface in which you can process data comfortably and visualise it in intuitive, human-readable ways. It’s no surprise that the tool is extremely popular among data professionals, with millions of public notebooks shared on Github every month.

Most data analysts use Jupyter notebooks with static (or “batch”) workflows. Jupyter has historically been less popular for streaming workflows. Using Jupyter with realtime data streams can feel intimidating or have mixed results due to inadequate or overly complicated tooling.

This article aims to change that. By working through this tutorial you will learn how to visualise a live data stream in real-time from the familiar context of a Jupyter Notebook. You will use Bokeh, Panel and Pathway to build a real-time data visualisation dashboard that will alert you when the data hits a critical threshold. You can follow along on this page or run the code for yourself in Colab or from Github.

Let’s jump in! 🪂

What we’ll do 👩🏽‍💻

In this tutorial, you will take a stream of financial data and implement a simple trading algorithm using Bollinger Bands. This will be a helpful example of visualising streaming data and will also show the usefulness of getting critical alerts in real-time.

You don’t need to fully understand the algorithm or the financial terminology. Focus on the steps you will do to take a live data stream, perform some computation on it in real-time, and output a live visualization dashboard in your Jupyter Notebook that updates on-the-fly. These steps are generalizable to any data streaming use case.

Concretely, we will compute the 1-minute running mean of Volume Weighted Average Price (vwap) and the 20-minute volatility, the Volume Weighted Standard Deviation (vwstd) on the price time series. This creates two bands around the mean price, with most price movements happening between the bands. Intuitively, when a price approaches the upper band, it is abnormally high and may likely drop — it is a good moment to SELL and we’ll raise an alert. Likewise, when the price approaches the lower band, it indicates it is low and may grow to revert to the mean — we’ll raise an alert that it’s a good moment to BUY. For further reliability, the BUY/SELL actions are performed only when there is a significant volume of trades, indicating that the outlying price is not a one-off event.

First, import the necessary libraries:

  • pathway for data processing,
  • datetime for date manipulation,
  • bokehand panel for dashboarding.
import datetime

import bokeh.models
import bokeh.plotting
import panel

import pathway as pw

And then fetch the sample data from Github. This data was generated using polygon.io.

!wget -nc https://gist.githubusercontent.com/janchorowski/e351af72ecd8d206a34763a428826ab7/raw/ticker.csv

Data source setup 🛠

Create a streaming data source that replays data in a CSV file. This is an easy way to simulate a live data stream without any infrastructure hassle. You can of course also use Pathway with a real, production-grade data stream, for example from Kafka or Redpanda.

The input_rate parameter controls how fast the data is replayed.

💡 No data processing actually happens when you run this cell. We are building a computational graph that will only be executed at the end of the tutorial. This allows the streaming engine to optimise the computations and perform them as fast as possible when the data starts streaming.

fname = "ticker.csv"
schema = pw.schema_from_csv(fname)
data = pw.demo.replay_csv(fname, schema=schema, input_rate=1000)

# # For static data exploration use
# data = pw.io.csv.read(fname, schema=schema, mode="static")

# Parse the timestamps
data = data.with_columns(
t=pw.apply_with_type(
datetime.datetime.fromtimestamp, pw.DATE_TIME_NAIVE, data.t / 1000.0
)
)

As you probably noticed, the code block above also includes a commented-out section. You can use this line instead of data = pw.demo.replay_csv(…) to test the workflow with static data. This is the only change you need to make in your code to switch between static and stream processing.

Let’s Trade! 💸

Now it’s time to build your trading algorithm. There is no need to fully understand the terminology or the math here. What’s most important to grasp is how you are taking a stream of data and performing a windowing transformation to get more analytical value out of the raw data.

Create the 20-minute Bollinger Band

Start by creating the first of our two Bollinger Bands: the 20-minute volatility measured as the Volume Weighted Standard Deviation. Use a sliding window to compute at every minute the volume weighted price mean and standard deviation aggregate on the past 20 minutes of data. The behavior option tells Pathway that the window should emit the statistics only when it is finished — we do not want to see incomplete results.

To compute the standard deviation, use the identity:

which is easily expressible using Pathway reducers: we first compute the total volume, price, and price2. We then post-process them to obtain the mean (vwap), standard deviation (vwstd), and Bollinger Bands places at vwap±2vwstd.

Or in simpler terms: the code block below takes your incoming data stream and calculates important statistics in real-time. These statistics are continually updated as the data comes in so that you can identify critical moments as they happen.

minute_20_stats = (
data.windowby(
pw.this.t,
window=pw.temporal.sliding(
hop=datetime.timedelta(minutes=1), duration=datetime.timedelta(minutes=20)
),
behavior=pw.temporal.exactly_once_behavior(),
instance=pw.this.ticker,
)
.reduce(
ticker=pw.this._pw_instance,
t=pw.this._pw_window_end,
volume=pw.reducers.sum(pw.this.volume),
transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),
transact_total2=pw.reducers.sum(pw.this.volume * pw.this.vwap**2),
)
.with_columns(vwap=pw.this.transact_total / pw.this.volume)
.with_columns(
vwstd=(pw.this.transact_total2 / pw.this.volume - pw.this.vwap**2) ** 0.5
)
.with_columns(
bollinger_upper=pw.this.vwap + 2 * pw.this.vwstd,
bollinger_lower=pw.this.vwap - 2 * pw.this.vwstd,
)
)

Create the 1-minute Bollinger Band

Now it’s time to compute the second Bollinger Bands: the 1-minute running mean. You will need to compute the mean price over the last minute of trades.

The code is analogous to the 20-minute statistics but simpler: you can use a tumbling window and don’t have to compute the standard deviation.

minute_1_stats = (
data.windowby(
pw.this.t,
window=pw.temporal.tumbling(datetime.timedelta(minutes=1)),
behavior=pw.temporal.exactly_once_behavior(),
instance=pw.this.ticker,
)
.reduce(
ticker=pw.this._pw_instance,
t=pw.this._pw_window_end,
volume=pw.reducers.sum(pw.this.volume),
transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),
)
.with_columns(vwap=pw.this.transact_total / pw.this.volume)
)

Joining the Bollinger Bands

Now join the 20-minute and 1-minute statistics, gathering all the information needed for alerting in one place. Alert triggering is now a breeze.

joint_stats = (
minute_1_stats.join(
minute_20_stats, pw.left.t == pw.right.t, pw.left.ticker == pw.right.ticker
)
.select(
*pw.left,
bollinger_lower=pw.right.bollinger_lower,
bollinger_upper=pw.right.bollinger_upper,
)
.with_columns(
is_alert=(pw.this.volume > 10000)
& (
(pw.this.vwap > pw.this.bollinger_upper)
| (pw.this.vwap < pw.this.bollinger_lower)
)
)
.with_columns(
action=pw.if_else(
pw.this.is_alert,
pw.if_else(pw.this.vwap > pw.this.bollinger_upper, "sell", "buy"),
"hodl",
)
)
)
alerts = joint_stats.filter(pw.this.is_alert)

Visualize your real-time data stream 🔮

Now let’s create a Bokeh plot and panel table visualization: the plot shows the Bollinger Bands along with the price running mean and indicates the price of buy and sell decisions. The table gathers all the decisions conveniently for further processing, such as reducing it to compute a historical evaluation of the gains of the strategy.

When the cell is executed, placeholder containers are created for the plot and table dashboard. They will be populated with live data when the computation is started (i.e. when running the final pw.run() at the end of this tutorial).

def stats_plotter(src):
actions = ["buy", "sell", "hodl"]
color_map = bokeh.models.CategoricalColorMapper(
factors=actions, palette=("#00ff00", "#ff0000", "#00000000")
)

fig = bokeh.plotting.figure(
height=400,
width=600,
title="20 minutes Bollinger bands with last 1 minute average",
x_axis_type="datetime",
y_range=(188.5, 191),
)
fig.line("t", "vwap", source=src)
band = bokeh.models.Band(
base="t",
lower="bollinger_lower",
upper="bollinger_upper",
source=src,
fill_alpha=0.3,
fill_color="gray",
line_color="black",
)

fig.scatter(
"t",
"vwap",
color={"field": "action", "transform": color_map},
size=10,
marker="circle",
source=src,
)

fig.add_layout(band)
return fig


viz = panel.Row(
joint_stats.plot(stats_plotter, sorting_col="t"),
alerts.select(pw.this.ticker, pw.this.t, pw.this.vwap, pw.this.action).show(
include_id=False, sorters=[{"field": "t", "dir": "desc"}]
),
)
viz

Run your pipeline in real-time 👨🏻‍🔧

All the hard work is done! The final step is to start the Pathway data processing engine using the command:

pw.run()

Watch how the dashboard is updated in realtime. The Bollinger Bands action trigger seems to be working — the green buy decision markers are frequently followed by the red sell markers at a slightly higher price.

While the computation is running, pathway prints important statistics such as message processing latency.

Successful evaluation of the code should result in the animation:

animation by author

Jupyter Notebooks for Streaming Data

Congratulations! You have successfully built a live data streaming pipeline with useful data visualisations and real-time alerts, right from a Jupyter notebook 😄

This is just a taste of what is possible. If you’re interested in diving deeper and building a production-grade data science pipeline all the way from data exploration to deployment, you may want to check out the full-length From Experimenting in Jupyter to Production tutorial.

Follow me on LinkedIn for regular data hacks and content 🚀

Happy streaming! 🦦

Originally published at https://pathway.com.

In Plain English 🚀

Thank you for being a part of the In Plain English community! Before you go:

--

--