Stock Data Extraction Using Apache Kafka

Overview This project utilizes Apache Kafka to extract stock data from the Polygon.io API and stores it in an Apache Cassandra database. It leverages Python for implementation and Confluent for Kafka management. Step-by-Step Analysis Step 1: Import Required Libraries import requests import os import json from dotenv import load_dotenv from confluent_kafka import Producer requests: A library for making HTTP requests to external APIs. os: Used to access environment variables. json: For encoding and decoding JSON data. dotenv: Loads environment variables from a .env file, keeping sensitive information secure. confluent_kafka: Provides the Kafka Producer class for sending messages to Kafka topics. Step 2: Load Environment Variables load_dotenv() This line reads the .env file and loads the environment variables, allowing for secure management of sensitive information like API keys and connection details. Step 3: Set Up API Parameters api_key = os.getenv('API_KEY_DATA') params = { 'adjusted': True, 'apiKey': api_key } api_key: Retrieves the API key for accessing the Polygon.io API from environment variables. params: Dictionary containing parameters for the API request, including whether the data should be adjusted and the API key. Step 4: Define the API Endpoint url = f'https://api.polygon.io/v2/aggs/grouped/locale/us/market/stocks/2025-04-04' Constructs the URL for the API request, specifying the date for which stock data is requested. This should be updated to be dynamic based on the current date in a production scenario. Step 5: Make the API Request response = requests.get(url, params=params) data = response.json() Sends a GET request to the Polygon.io API using the constructed URL and parameters. Converts the response to a Python dictionary using response.json() for further processing. Step 6: Configure Kafka Producer kafka_config = { 'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'), "security.protocol": "SASL_SSL", "sasl.mechanisms": "PLAIN", "sasl.username": os.getenv('CONFLUENT_API_KEY'), "sasl.password": os.getenv('CONFLUENT_SECRET_KEY'), "broker.address.family": "v4", "message.send.max.retries": 5, "retry.backoff.ms": 500, } kafka_config: A dictionary containing configuration settings for connecting to the Kafka broker. It includes: bootstrap.servers: The address of the Kafka broker. Security settings for SASL_SSL connections. Retry settings for message sending. Step 7: Initialize the Kafka Producer producer = Producer(kafka_config) topic = 'stocks-prices' Creates an instance of the Kafka Producer using the specified configuration settings. Defines the topic name (stocks-prices) where stock data messages will be sent. Step 8: Produce Messages to Kafka for item in data.get('results', []): stock_symbol = item.get('T', 'unknown_symbol') producer.produce(topic, key=stock_symbol, value=json.dumps(item)) producer.poll(0) Iterates over the list of stock data results obtained from the API response. For each item, it extracts the stock symbol and sends the entire item as a JSON string to the specified Kafka topic. Calls producer.poll(0) to handle any delivery reports and ensure messages are sent promptly. Step 9: Flush the Producer producer.flush() Consumer ** After producing, it is now time for us consumer to read the messages on the topic; **Overview This Kafka consumer listens to the stocks-prices topic, processes incoming stock data messages, and stores them in an Apache Cassandra database. It is implemented in Python. Code Breakdown **1. Import Libraries: from cassandra.cluster import Cluster from cassandra.query import SimpleStatement from confluent_kafka import Consumer import os import json from dotenv import load_dotenv Libraries for Cassandra access, Kafka consumption, and environment variable management. 2.Load Environment Variables: load_dotenv() Loads sensitive data such as API keys and database connection details. Connect to Cassandra: cluster = Cluster([os.getenv('CASSANDRA_HOST')]) session = cluster.connect() Establishes a connection to the Cassandra database. 3.Create Keyspace and Table: session.execute("CREATE KEYSPACE IF NOT EXISTS stocks_data ...") session.execute("CREATE TABLE IF NOT EXISTS stocks_data.stocks ...") Creates the necessary keyspace and table for storing stock data. Configure and Initialize Kafka Consumer: consumer = Consumer({ 'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'), 'group.id': 'stock_consumer_group', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, ... }) consumer.subscribe(['stocks-prices']) Sets up the Kafka consumer with necessary configurations and subscribes to the topic. 4.Consume and Process Messages: msg = consumer.poll(1.0) if msg is not None and not msg.error(): stock_data = json.loads(msg.value().decode('utf-8')) session.execut

Apr 6, 2025 - 11:28
 0
Stock Data Extraction Using Apache Kafka

Overview
This project utilizes Apache Kafka to extract stock data from the Polygon.io API and stores it in an Apache Cassandra database. It leverages Python for implementation and Confluent for Kafka management.

Step-by-Step Analysis
Step 1: Import Required Libraries

import requests
import os
import json
from dotenv import load_dotenv
from confluent_kafka import Producer

requests: A library for making HTTP requests to external APIs.
os: Used to access environment variables.
json: For encoding and decoding JSON data.
dotenv: Loads environment variables from a .env file, keeping sensitive information secure.
confluent_kafka: Provides the Kafka Producer class for sending messages to Kafka topics.
Step 2: Load Environment Variables

load_dotenv()

This line reads the .env file and loads the environment variables, allowing for secure management of sensitive information like API keys and connection details.

Step 3: Set Up API Parameters

api_key = os.getenv('API_KEY_DATA')
params = {
    'adjusted': True,
    'apiKey': api_key
}

api_key: Retrieves the API key for accessing the Polygon.io API from environment variables.
params: Dictionary containing parameters for the API request, including whether the data should be adjusted and the API key.

Step 4: Define the API Endpoint

url = f'https://api.polygon.io/v2/aggs/grouped/locale/us/market/stocks/2025-04-04'

Constructs the URL for the API request, specifying the date for which stock data is requested. This should be updated to be dynamic based on the current date in a production scenario.

Step 5: Make the API Request

response = requests.get(url, params=params)
data = response.json()

Sends a GET request to the Polygon.io API using the constructed URL and parameters.
Converts the response to a Python dictionary using response.json() for further processing.

Step 6: Configure Kafka Producer

kafka_config = {
    'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'),
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv('CONFLUENT_API_KEY'),
    "sasl.password": os.getenv('CONFLUENT_SECRET_KEY'),
    "broker.address.family": "v4",
    "message.send.max.retries": 5,
    "retry.backoff.ms": 500,
}

kafka_config: A dictionary containing configuration settings for connecting to the Kafka broker. It includes:
bootstrap.servers: The address of the Kafka broker.
Security settings for SASL_SSL connections.
Retry settings for message sending.

Step 7: Initialize the Kafka Producer

producer = Producer(kafka_config)
topic = 'stocks-prices'

Creates an instance of the Kafka Producer using the specified configuration settings.
Defines the topic name (stocks-prices) where stock data messages will be sent.
Step 8: Produce Messages to Kafka

for item in data.get('results', []):
    stock_symbol = item.get('T', 'unknown_symbol')
    producer.produce(topic, key=stock_symbol, value=json.dumps(item))
    producer.poll(0)

Iterates over the list of stock data results obtained from the API response.
For each item, it extracts the stock symbol and sends the entire item as a JSON string to the specified Kafka topic.
Calls producer.poll(0) to handle any delivery reports and ensure messages are sent promptly.

Step 9: Flush the Producer

producer.flush()

Consumer **
After producing, it is now time for us consumer to read the messages on the topic;
**Overview

This Kafka consumer listens to the stocks-prices topic, processes incoming stock data messages, and stores them in an Apache Cassandra database. It is implemented in Python.

Code Breakdown
**1. Import Libraries:

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from confluent_kafka import Consumer
import os
import json
from dotenv import load_dotenv

Libraries for Cassandra access, Kafka consumption, and environment variable management.
2.Load Environment Variables:

load_dotenv()

Loads sensitive data such as API keys and database connection details.
Connect to Cassandra:

cluster = Cluster([os.getenv('CASSANDRA_HOST')])
session = cluster.connect()

Establishes a connection to the Cassandra database.
3.Create Keyspace and Table:

session.execute("CREATE KEYSPACE IF NOT EXISTS stocks_data ...")
session.execute("CREATE TABLE IF NOT EXISTS stocks_data.stocks ...")

Creates the necessary keyspace and table for storing stock data.
Configure and Initialize Kafka Consumer:

consumer = Consumer({
    'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'),
    'group.id': 'stock_consumer_group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    ...
})
consumer.subscribe(['stocks-prices'])

Sets up the Kafka consumer with necessary configurations and subscribes to the topic.
4.Consume and Process Messages:

msg = consumer.poll(1.0)
if msg is not None and not msg.error():
    stock_data = json.loads(msg.value().decode('utf-8'))
    session.execute("INSERT INTO stocks_data.stocks ...", (stock_data['T'], stock_data['c'], stock_data['o'], stock_data['t']))

Continuously polls for messages, processes valid messages, and inserts them into Cassandra.
5.Commit Offsets and Shutdown:

consumer.commit()
consumer.close()
cluster.shutdown()

Commits message offsets and gracefully shuts down the consumer and Cassandra connection.

6.Querying from Apache Cassandra

Image description

CONCLUSION
This project successfully integrates Apache Kafka and Apache Cassandra to create a robust system for extracting and storing stock data from the Polygon.io API.

Key Achievements
Real-Time Data Streaming: The Kafka producer fetches stock data in real-time, ensuring timely updates to the data stream.
Reliable Message Handling: The Kafka consumer efficiently processes messages from the stocks-prices topic, handling errors gracefully and ensuring data integrity.
Scalable Data Storage: By utilizing Cassandra, the system effectively stores large volumes of stock data, allowing quick retrieval and analysis.