Injecting Real-Time WebSocket Streams into SQLite Using Python and WAL Hooking
SQLite isn't the first database that comes to mind for real-time streaming — but with Python and a little creativity, we can tap into a WebSocket stream and pipe the data directly into an active SQLite instance. This article explores how to combine WebSockets, Python’s sqlite3 module, and SQLite’s Write-Ahead Logging (WAL) mode to build a fast, lightweight real-time ingestion pipeline — no cloud required. Why Stream Into SQLite? Logging IoT or edge device data Storing real-time events in offline-first apps Building dashboards without a full database server Step 1: Enable WAL Mode WAL improves concurrency and smooths out frequent writes: import sqlite3 def init_db(): conn = sqlite3.connect("live_stream.db") conn.execute("PRAGMA journal_mode=WAL;") conn.execute(""" CREATE TABLE IF NOT EXISTS stream_data ( id INTEGER PRIMARY KEY, timestamp TEXT, payload TEXT ) """) conn.commit() conn.close() Step 2: Connect to WebSocket We’ll use the websockets package to receive and log each message: import asyncio, datetime, websockets async def stream_to_sqlite(): uri = "wss://echo.websocket.org" async with websockets.connect(uri) as ws: while True: msg = await ws.recv() save(datetime.datetime.utcnow().isoformat(), msg) def save(ts, payload): conn = sqlite3.connect("live_stream.db") conn.execute("INSERT INTO stream_data (timestamp, payload) VALUES (?, ?)", (ts, payload)) conn.commit() conn.close() Step 3: Run the Pipeline if name == "main": init_db() asyncio.run(stream_to_sqlite()) Pros and Cons ✅ Pros No external services or infra Fast local writes with WAL Portable and file-based ⚠️ Cons Not distributed or cloud-native Limited concurrency under load WebSocket disconnections need handling
SQLite isn't the first database that comes to mind for real-time streaming — but with Python and a little creativity, we can tap into a WebSocket stream and pipe the data directly into an active SQLite instance. This article explores how to combine WebSockets, Python’s sqlite3
module, and SQLite’s Write-Ahead Logging (WAL) mode to build a fast, lightweight real-time ingestion pipeline — no cloud required.
Why Stream Into SQLite?
- Logging IoT or edge device data
- Storing real-time events in offline-first apps
- Building dashboards without a full database server
Step 1: Enable WAL Mode
WAL improves concurrency and smooths out frequent writes:
import sqlite3
def init_db():
conn = sqlite3.connect("live_stream.db")
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("""
CREATE TABLE IF NOT EXISTS stream_data (
id INTEGER PRIMARY KEY,
timestamp TEXT,
payload TEXT
)
""")
conn.commit()
conn.close()
Step 2: Connect to WebSocket
We’ll use the websockets
package to receive and log each message:
import asyncio, datetime, websockets
async def stream_to_sqlite():
uri = "wss://echo.websocket.org"
async with websockets.connect(uri) as ws:
while True:
msg = await ws.recv()
save(datetime.datetime.utcnow().isoformat(), msg)
def save(ts, payload):
conn = sqlite3.connect("live_stream.db")
conn.execute("INSERT INTO stream_data (timestamp, payload) VALUES (?, ?)", (ts, payload))
conn.commit()
conn.close()
Step 3: Run the Pipeline
if name == "main":
init_db()
asyncio.run(stream_to_sqlite())
Pros and Cons
✅ Pros
- No external services or infra
- Fast local writes with WAL
- Portable and file-based
⚠️ Cons
- Not distributed or cloud-native
- Limited concurrency under load
- WebSocket disconnections need handling