{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "467e2e79-8418-44af-93fd-d66de6bfab02", "metadata": {}, "source": [ "# Chapter 2: A Tour of PySpark Data Types" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e4f1e431-795b-4da9-87bd-0e0f0370cf69", "metadata": {}, "source": [ "## Basic Data Types in PySpark\n", "Understanding the basic data types in PySpark is crucial for defining DataFrame schemas and performing efficient data processing. Below is a detailed overview of each type, with descriptions, Python equivalents, and examples:" ] }, { "attachments": {}, "cell_type": "markdown", "id": "19f6805c-d3f9-47ff-8f27-7d3a8314e8bc", "metadata": {}, "source": [ "### Numerical Types\n", "\n", "ByteType\n", "Used to store byte-length integers ranging from `-128` to `127`. Ideal for storing small data efficiently.\n", "- Python Equivalent: `int` (`-128` to `127`)\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 2, "id": "7e05a5cb-d814-4a43-9524-61a1b186871b", "metadata": {}, "outputs": [], "source": [ "byte_example = 127 # Maximum value for a signed byte" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c1afd5eb-ad40-4179-87bb-bd50b7b81882", "metadata": {}, "source": [ "ShortType\n", "Represents a short integer, storing values between `-32768` and `32767`. More efficient than using IntegerType for data with smaller numerical ranges.\n", "- Python Equivalent: `int` (`-32768` to `32767`)\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 3, "id": "2055833e-e92e-4ffc-9ed9-87be78ffe9f0", "metadata": {}, "outputs": [], "source": [ "short_example = 32767 # Maximum value for a signed short" ] }, { "attachments": {}, "cell_type": "markdown", "id": "92e7e44a", "metadata": {}, "source": [ "IntegerType\n", "Used to store integer values. Ideal for counts, indices, and any discrete quantity.\n", "- Python Equivalent: `int` (`-2147483648` to `2147483647`)\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 4, "id": "2d05fce6", "metadata": {}, "outputs": [], "source": [ "integer_example = 123" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ac0b7692", "metadata": {}, "source": [ "LongType\n", "Suitable for storing large integer values, often used for identifiers or large counts.\n", "- Python Equivalent: `int` (`-9223372036854775808` to `9223372036854775807`)\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 5, "id": "28a7a514", "metadata": {}, "outputs": [], "source": [ "long_integer_example = 1234567890123456789" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0aad9f8c-f655-4803-9c01-fd8a4c520121", "metadata": {}, "source": [ "DoubleType\n", "Provides double precision floating-point numbers for accurate and precise calculations.\n", "- Python Equivalent: `float` (double precision)\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 6, "id": "cc7f7e87-3046-49a2-8621-a238015deb20", "metadata": {}, "outputs": [], "source": [ "double_example = 12345.6789" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e5a314cb-2206-40a2-9565-3fba0763faf7", "metadata": {}, "source": [ "FloatType\n", "Used for floating-point numbers where less precision is acceptable in exchange for performance.\n", "- Python Equivalent: `float` (single precision)\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 7, "id": "5d9a0d47-ed4b-4cf6-b98d-629a4d2cfe10", "metadata": {}, "outputs": [], "source": [ "float_example = 123.456" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4fc2ca5a-f47f-484c-afb5-334555afb901", "metadata": {}, "source": [ "DecimalType\n", "Allows fixed precision and scale, used in scenarios requiring exact decimal representation, such as financial computations.\n", "- Python Equivalent: `decimal.Decimal`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 8, "id": "f2f3fb9e-97fa-416f-80c5-87e62ae78ab3", "metadata": {}, "outputs": [], "source": [ "from decimal import Decimal\n", "decimal_example = Decimal('12345.6789')" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e8b97b6e-94bb-4006-b3f6-daa8688e9f6a", "metadata": {}, "source": [ "### StringType\n", "Used for text data; supports Unicode and is capable of storing any string data.\n", "- Python Equivalent: `str`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 9, "id": "a30aba2f-b686-4382-accd-dd3d473181b7", "metadata": {}, "outputs": [], "source": [ "string_example = \"Hello, World!\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2e858a57-0e32-4806-b913-3ed74c9d40df", "metadata": {}, "source": [ "### BinaryType\n", "Used for raw byte data, such as file contents or images, stored as binary streams.\n", "- Python Equivalent: `bytes`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 10, "id": "40d8f6e6-47a6-4030-bb61-030e2c439f9b", "metadata": {}, "outputs": [], "source": [ "binary_example = b'Hello, binary world!'" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6ed47ac0-dfed-41f7-88aa-0947033fd451", "metadata": {}, "source": [ "### BooleanType\n", "Represents Boolean values, used extensively in conditional operations and filters.\n", "- Python Equivalent: `bool`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 11, "id": "1c0fc79e-cad2-4e0f-9c16-63805c0dd309", "metadata": {}, "outputs": [], "source": [ "boolean_example = True" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6de4ab5b-fb95-4bac-a72b-773d9808dd71", "metadata": {}, "source": [ "### Datetime Types\n", "\n", "DateType\n", "Used for dates without time, suitable for storing calendar dates like birthdays or specific days.\n", "- Python Equivalent: `datetime.date`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 12, "id": "aa99ee3b-ec22-484d-98d1-de6478090cf3", "metadata": {}, "outputs": [], "source": [ "from datetime import date\n", "date_example = date(2020, 1, 1)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "064f3f44-1e2e-46ab-953f-d8d971b1292e", "metadata": {}, "source": [ "TimestampType\n", "Stores both date and time, essential for recording precise moments in time, such as log timestamps.\n", "- Python Equivalent: `datetime.datetime`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 13, "id": "935536c7-edc2-402c-a157-650f4e98613b", "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "timestamp_example = datetime(2020, 1, 1, 12, 0)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c54c3210-7022-4adf-b364-8967a06703f9", "metadata": {}, "source": [ "### Creating a DataFrame from Python Objects in PySpark\n", "Here's how to define a schema and create a DataFrame in PySpark using the Python objects corresponding to each basic data type:" ] }, { "cell_type": "code", "execution_count": 14, "id": "42fcac44-3fb8-4e9e-82fa-1557275f04e2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+\n", "|integer_field| long_field|double_field|float_field|decimal_field| string_field| binary_field|boolean_field|date_field| timestamp_field|\n", "+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+\n", "| 123| 1234567890123456789| 12345.6789| 123.456| 12345.67| Hello, World!|[48 65 6C 6C 6F 2...| true|2020-01-01|2020-01-01 12:00:00|\n", "| 456| 9223372036854775807| 98765.4321| 987.654| 98765.43|Goodbye, World!|[47 6F 6F 64 62 7...| false|2025-12-31|2025-12-31 23:59:00|\n", "| -1|-1234567890123456789| -12345.6789| -123.456| -12345.67|Negative Values|[4E 65 67 61 74 6...| false|1990-01-01|1990-01-01 00:00:00|\n", "| 0| 0| 0.0| 0.0| 0.00| | []| true|2000-01-01|2000-01-01 00:00:00|\n", "+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+\n", "\n" ] } ], "source": [ "from pyspark.sql import SparkSession\n", "from pyspark.sql.types import StructType, StructField, IntegerType, LongType, DoubleType, FloatType\n", "from pyspark.sql.types import DecimalType, StringType, BinaryType, BooleanType, DateType, TimestampType\n", "from decimal import Decimal\n", "from datetime import date, datetime\n", "\n", "# Define the schema of the DataFrame\n", "schema = StructType([\n", " StructField(\"integer_field\", IntegerType(), nullable=False),\n", " StructField(\"long_field\", LongType(), nullable=False),\n", " StructField(\"double_field\", DoubleType(), nullable=False),\n", " StructField(\"float_field\", FloatType(), nullable=False),\n", " StructField(\"decimal_field\", DecimalType(10, 2), nullable=False),\n", " StructField(\"string_field\", StringType(), nullable=False),\n", " StructField(\"binary_field\", BinaryType(), nullable=False),\n", " StructField(\"boolean_field\", BooleanType(), nullable=False),\n", " StructField(\"date_field\", DateType(), nullable=False),\n", " StructField(\"timestamp_field\", TimestampType(), nullable=False)\n", "])\n", "\n", "# Sample data using the Python objects corresponding to each PySpark type\n", "data = [\n", " (123, 1234567890123456789, 12345.6789, 123.456, Decimal('12345.67'), \"Hello, World!\",\n", " b'Hello, binary world!', True, date(2020, 1, 1), datetime(2020, 1, 1, 12, 0)),\n", " (456, 9223372036854775807, 98765.4321, 987.654, Decimal('98765.43'), \"Goodbye, World!\",\n", " b'Goodbye, binary world!', False, date(2025, 12, 31), datetime(2025, 12, 31, 23, 59)),\n", " (-1, -1234567890123456789, -12345.6789, -123.456, Decimal('-12345.67'), \"Negative Values\",\n", " b'Negative binary!', False, date(1990, 1, 1), datetime(1990, 1, 1, 0, 0)),\n", " (0, 0, 0.0, 0.0, Decimal('0.00'), \"\", b'', True, date(2000, 1, 1), datetime(2000, 1, 1, 0, 0))\n", "]\n", "\n", "# Create DataFrame\n", "df = spark.createDataFrame(data, schema=schema)\n", "\n", "# Show the DataFrame\n", "df.show()\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2495a43f-58a8-4027-9987-457487656ee7", "metadata": {}, "source": [ "## Precision for Doubles, Floats, and Decimals\n", "Understanding precision in numerical data types is critical for data integrity, especially in fields requiring high accuracy such as financial analysis, scientific computation, and engineering. PySpark offers different data types to cater to these needs." ] }, { "attachments": {}, "cell_type": "markdown", "id": "18e8f2db-782a-484f-ac0c-da7aaf19c79b", "metadata": {}, "source": [ "FloatType\n", "`FloatType` in PySpark represents a single precision 32-bit IEEE 754 floating-point number. It's less precise but requires less storage and can be processed faster than DoubleType. This makes it suitable for applications where a large volume of numerical data needs to be processed quickly and extreme precision is not critical.\n", "Usage Scenario\n", "Useful in machine learning algorithms for faster computation when processing large datasets." ] }, { "attachments": {}, "cell_type": "markdown", "id": "3d1c9c29-4de9-478d-b336-3e141ef81dcc", "metadata": {}, "source": [ "DoubleType\n", "`DoubleType` corresponds to a double precision 64-bit IEEE 754 floating-point number. It provides a good balance between precision and performance and is suitable for most numerical calculations where precision is important.\n", "Usage Scenario\n", "Ideal for financial calculations where precision is more crucial than computational speed." ] }, { "attachments": {}, "cell_type": "markdown", "id": "0ef870f4-575a-4e1f-912c-d4c84180beaf", "metadata": {}, "source": [ "DecimalType\n", "`DecimalType` is used when dealing with high-precision fixed-scale decimal numbers. The precision and scale can be defined by the user, which makes it invaluable for applications such as financial reporting, where precise decimal representation helps avoid rounding errors.\n", "Usage Scenario\n", "Critical in accounting applications where calculations need to be accurate to the cent." ] }, { "attachments": {}, "cell_type": "markdown", "id": "4dc12018-9e48-47da-a898-c05ae37fb4ea", "metadata": {}, "source": [ "### Example: Calculating Financial Statistics\n", "This example demonstrates how to use different numerical data types in PySpark for financial calculations, such as aggregating revenues and calculating averages with appropriate precision." ] }, { "cell_type": "code", "execution_count": 15, "id": "144459bf-8e8e-4c38-b820-f259f225709f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+\n", "|Total_Revenue_Float|Average_Revenue_Float|Total_Revenue_Double|Average_Revenue_Double|Total_Revenue_Decimal|Average_Revenue_Decimal|\n", "+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+\n", "| 165,432.20| 55,144.07| 165,432.21| 55,144.07| 165,432.21| 55,144.07|\n", "+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+\n", "\n" ] } ], "source": [ "from decimal import Decimal\n", "\n", "from pyspark.sql.types import StructType, StructField, FloatType, DoubleType, DecimalType\n", "from pyspark.sql.functions import sum, avg, col, format_number\n", "\n", "# Define the schema of the DataFrame\n", "schema = StructType([\n", " StructField(\"revenue_float\", FloatType(), nullable=False),\n", " StructField(\"revenue_double\", DoubleType(), nullable=False),\n", " StructField(\"revenue_decimal\", DecimalType(10, 2), nullable=False)\n", "])\n", "\n", "# Sample data\n", "data = [\n", " (12345.67, 12345.6789, Decimal('12345.68')),\n", " (98765.43, 98765.4321, Decimal('98765.43')),\n", " (54321.10, 54321.0987, Decimal('54321.10'))\n", "]\n", "\n", "# Create DataFrame\n", "df = spark.createDataFrame(data, schema=schema)\n", "\n", "# Calculations\n", "result = df.select(\n", " format_number(sum(col(\"revenue_float\")), 2).alias(\"Total_Revenue_Float\"),\n", " format_number(avg(col(\"revenue_float\")), 2).alias(\"Average_Revenue_Float\"),\n", " format_number(sum(col(\"revenue_double\")), 2).alias(\"Total_Revenue_Double\"),\n", " format_number(avg(col(\"revenue_double\")), 2).alias(\"Average_Revenue_Double\"),\n", " format_number(sum(col(\"revenue_decimal\")), 2).alias(\"Total_Revenue_Decimal\"),\n", " format_number(avg(col(\"revenue_decimal\")), 2).alias(\"Average_Revenue_Decimal\")\n", ")\n", "\n", "result.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "22e45ca5-d694-4bc1-83ac-ad1fc13fa745", "metadata": {}, "source": [ "## Complex Data Types in PySpark\n", "Complex data types in PySpark facilitate the handling of nested and structured data, which is essential for working with modern data formats like JSON, XML, and others commonly found in big data ecosystems. This section explores the primary complex data types available in PySpark: `ArrayType`, `StructType`, `MapType`, and their use cases." ] }, { "attachments": {}, "cell_type": "markdown", "id": "993de0fb-c568-42a2-85f0-5ab1bb80027d", "metadata": {}, "source": [ "ArrayType\n", "Allows storage of multiple values of the same type in a single column. Ideal for data that naturally forms a list, such as tags, categories, or historical data points.\n", "- Python Equivalent: `list`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 16, "id": "547ba1eb-e115-4c07-b554-b3f69e321042", "metadata": {}, "outputs": [], "source": [ "array_example = ['apple', 'banana', 'cherry']" ] }, { "attachments": {}, "cell_type": "markdown", "id": "66549a85-4e94-4bb7-adf3-547bc8f7e7e7", "metadata": {}, "source": [ "Usage Scenario\n", "Managing lists of items associated with each record, such as multiple phone numbers or email addresses for a single contact." ] }, { "attachments": {}, "cell_type": "markdown", "id": "c7f28820-4db9-4bb2-9290-c472585b1e6f", "metadata": {}, "source": [ "StructType\n", "Enables nesting of DataFrame columns, allowing complex and hierarchical data structures within a single DataFrame cell. Each field in a `StructType` can itself be a complex type. It's similar to a row in a DataFrame, typically used to encapsulate records with a structured schema.\n", "- Python Equivalent: `pyspark.sql.Row`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 17, "id": "51e2337c-4d81-4d12-8ebf-ed3bdbab9ce7", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import Row\n", "struct_example = Row(name=\"John Doe\", age=30, address=Row(street=\"123 Elm St\", city=\"Somewhere\"))" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7bc34c76-0037-45e0-a640-488f393a9536", "metadata": {}, "source": [ "Usage Scenario\n", "Often used to represent a JSON object, enabling the manipulation of each JSON field as if it were a column in the DataFrame." ] }, { "attachments": {}, "cell_type": "markdown", "id": "a5049490-8a4d-4f2d-99ed-1554cd8a4ef9", "metadata": {}, "source": [ "MapType\n", "Represents a key-value pair in a DataFrame column, where each key and value can be of any data type. Useful for dynamically structured data.\n", "- Python Equivalent: `dict`\n", "Python Example" ] }, { "cell_type": "code", "execution_count": 18, "id": "6bb04ee8-b9f0-427f-88a7-be0cdc049192", "metadata": {}, "outputs": [], "source": [ "map_example = {'food': 'pizza', 'color': 'blue', 'car': 'Tesla'}" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7506bea9-6487-4202-bde4-3b0764ce92b2", "metadata": {}, "source": [ "Usage Scenario\n", "Storing and processing collections of key-value pairs within a single DataFrame column, like attributes of a product where keys are attribute names and values are attribute values." ] }, { "attachments": {}, "cell_type": "markdown", "id": "915a020f-b439-4da9-9090-7cdb54f32aef", "metadata": {}, "source": [ "### Example: Handling Complex Nested Data\n", "To illustrate the use of these complex data types, let's consider a practical example involving nested data structures such as a customer record that includes multiple addresses and preferences in various categories." ] }, { "cell_type": "code", "execution_count": 19, "id": "1bee3e29-0cc3-4345-aa05-c5e85d4a9b6b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+---------------------------------------------------------------+---------------------------------------------+\n", "|name |addresses |preferences |\n", "+----------+---------------------------------------------------------------+---------------------------------------------+\n", "|John Doe |[{123 Elm St, Somewhere, 12345}, {456 Oak St, Anywhere, 67890}]|{color -> blue, car -> Tesla, food -> pizza} |\n", "|Jane Smith|[{789 Pine St, Everywhere, 10112}] |{color -> green, car -> Honda, food -> sushi}|\n", "+----------+---------------------------------------------------------------+---------------------------------------------+\n", "\n" ] } ], "source": [ "from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType\n", "from pyspark.sql import Row\n", "\n", "# Define the schema of the DataFrame\n", "schema = StructType([\n", " StructField(\"name\", StringType(), nullable=False),\n", " StructField(\"addresses\", ArrayType(\n", " StructType([\n", " StructField(\"street\", StringType(), nullable=False),\n", " StructField(\"city\", StringType(), nullable=False),\n", " StructField(\"zip\", StringType(), nullable=False)\n", " ])\n", " ), nullable=True),\n", " StructField(\"preferences\", MapType(StringType(), StringType()), nullable=True)\n", "])\n", "\n", "# Sample data using Row objects for StructType\n", "data = [\n", " Row(name=\"John Doe\",\n", " addresses=[Row(street=\"123 Elm St\", city=\"Somewhere\", zip=\"12345\"),\n", " Row(street=\"456 Oak St\", city=\"Anywhere\", zip=\"67890\")],\n", " preferences={\"food\": \"pizza\", \"color\": \"blue\", \"car\": \"Tesla\"}),\n", " Row(name=\"Jane Smith\",\n", " addresses=[Row(street=\"789 Pine St\", city=\"Everywhere\", zip=\"10112\")],\n", " preferences={\"food\": \"sushi\", \"color\": \"green\", \"car\": \"Honda\"})\n", "]\n", "\n", "# Create DataFrame\n", "df = spark.createDataFrame(data, schema=schema)\n", "\n", "# Show the DataFrame\n", "df.show(truncate=False)\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1fdd3857-2e87-46c7-bd34-f652b39dcf1c", "metadata": {}, "source": [ "In this example:\n", "- `ArrayType` is used to store multiple addresses for each customer.\n", "- `StructType` is nested within `ArrayType` to represent each address as a structured record.\n", "- `MapType` stores preferences as key-value pairs, allowing for dynamic data storage." ] }, { "attachments": {}, "cell_type": "markdown", "id": "9d374572-a06b-48dd-87e1-c21a89b6f6e7", "metadata": {}, "source": [ "## Casting Columns in PySpark\n", "Casting columns is a fundamental operation in data processing where the data type of a column in a DataFrame is converted from one type to another. PySpark provides straightforward methods that enable you to align input data types with the requirements of data processing operations or applications." ] }, { "attachments": {}, "cell_type": "markdown", "id": "13a44a7d-48cb-4548-a020-489b8753dc55", "metadata": {}, "source": [ "### How to Cast Columns\n", "To cast columns in PySpark, the `cast()` or `astype()` method can be used on a column. Here’s a complete example demonstrating how to perform basic casting operations:" ] }, { "cell_type": "code", "execution_count": 20, "id": "0bc63e7b-0819-4dff-bb42-db63f4b17928", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Original DataFrame:\n", "+------------+-------------+\n", "|float_column|string_column|\n", "+------------+-------------+\n", "| 123.456| 123|\n", "| 789.012| 456|\n", "| NULL| 789|\n", "+------------+-------------+\n", "\n", "DataFrame after Casting:\n", "+------------+-------------+-----------------+-------------------+\n", "|float_column|string_column|string_from_float|integer_from_string|\n", "+------------+-------------+-----------------+-------------------+\n", "| 123.456| 123| 123.456| 123|\n", "| 789.012| 456| 789.012| 456|\n", "| NULL| 789| NULL| 789|\n", "+------------+-------------+-----------------+-------------------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import col\n", "from pyspark.sql.types import StructType, StructField, StringType, FloatType\n", "\n", "# Define the schema of the DataFrame\n", "schema = StructType([\n", " StructField(\"float_column\", FloatType(), nullable=True),\n", " StructField(\"string_column\", StringType(), nullable=True)\n", "])\n", "\n", "# Sample data\n", "data = [\n", " (123.456, \"123\"),\n", " (789.012, \"456\"),\n", " (None, \"789\")\n", "]\n", "\n", "# Create DataFrame\n", "df = spark.createDataFrame(data, schema=schema)\n", "\n", "# Display original DataFrame\n", "print(\"Original DataFrame:\")\n", "df.show()\n", "\n", "# Example of casting a float column to string\n", "df = df.withColumn('string_from_float', col('float_column').cast('string'))\n", "\n", "# Example of casting a string column to integer\n", "df = df.withColumn('integer_from_string', col('string_column').cast('integer'))\n", "\n", "# Display DataFrame after casting\n", "print(\"DataFrame after Casting:\")\n", "df.show()\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "433312a0-9860-4879-8859-e94d711447b3", "metadata": {}, "source": [ "### Cast with Caution: Potential Data Loss\n", "When casting columns, it's important to be aware of how PySpark handles incompatible or invalid casting operations:\n", "\n", "Silent Conversion to Null\n", "- If ANSI mode is disabled, PySpark does not throw an error if a value cannot be converted to the desired type during casting. Instead, it overflows or converts the value to `null`. This behavior can lead to data loss in your dataset, which might not be immediately obvious.\n", "- If ANSI mode is enabled, PySpark throws an error in that case. If it is acceptable, use `try_cast` instead.\n", "\n", "Example: Checking for Data Loss\n", "- It's a good practice to check for unexpected nulls that result from casting operations, especially when converting from string to numeric types where formatting issues may cause failures." ] }, { "cell_type": "code", "execution_count": 21, "id": "ccbd0999-89b1-47c4-80d8-b5b07dca627f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Original DataFrame:\n", "+---------------+\n", "|original_column|\n", "+---------------+\n", "| 123|\n", "| abc|\n", "| NULL|\n", "+---------------+\n", "\n", "DataFrame Showing Potential Data Loss:\n", "+---------------+-------------+\n", "|original_column|casted_column|\n", "+---------------+-------------+\n", "| abc| NULL|\n", "+---------------+-------------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import col\n", "from pyspark.sql.types import StructType, StructField, StringType\n", "\n", "# Disable ANSI mode\n", "spark.conf.set(\"spark.sql.ansi.enabled\", False)\n", "\n", "# Define the schema of the DataFrame\n", "schema = StructType([\n", " StructField(\"original_column\", StringType(), nullable=True)\n", "])\n", "\n", "# Sample data\n", "data = [\n", " (\"123\",), # Valid integer in string form\n", " (\"abc\",), # Invalid, will result in null when cast to integer\n", " (None,) # Original null, remains null\n", "]\n", "\n", "# Create DataFrame\n", "df = spark.createDataFrame(data, schema=schema)\n", "\n", "# Display original DataFrame\n", "print(\"Original DataFrame:\")\n", "df.show()\n", "\n", "# Add a new column with casted values\n", "df = df.withColumn('casted_column', col('original_column').cast('integer'))\n", "\n", "# Show rows where casting resulted in nulls but the original column had data\n", "print(\"DataFrame Showing Potential Data Loss:\")\n", "df.filter(col('original_column').isNotNull() & col('casted_column').isNull()).show()\n", "\n", "spark.conf.unset(\"spark.sql.ansi.enabled\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "07832bae-7065-4f27-a5c4-ea67a609d31d", "metadata": {}, "source": [ "### Best Practices for Casting\n", "\n", "Validate Data First\n", "- Before casting columns, especially when converting strings to numerical types, validate and clean your data to ensure it conforms to expected formats.\n", "\n", "Example: Checking if numeric strings are properly formatted before casting to integers" ] }, { "cell_type": "code", "execution_count": 22, "id": "a0ed0018-fb20-47b7-a652-a4c6c9c3100b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+\n", "|data|\n", "+----+\n", "| 100|\n", "| 300|\n", "+----+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import col, regexp_extract\n", "\n", "# Sample DataFrame with a string column\n", "df = spark.createDataFrame([(\"100\",), (\"20x\",), (\"300\",)], [\"data\"])\n", "\n", "# Checking and filtering rows where data can be safely cast to an integer\n", "valid_df = df.filter(regexp_extract(col(\"data\"), '^[0-9]+$', 0) != \"\")\n", "valid_df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f6608cab-c774-4d89-9f7e-6b739855988d", "metadata": {}, "source": [ "Use Explicit Schemas\n", "- When reading data, use explicit schemas to avoid incorrect data type inference, which can minimize the need for casting.\n", "\n", "Example: Specifying a schema when reading data to ensure correct data types are applied from the start" ] }, { "cell_type": "code", "execution_count": 23, "id": "e217a874-cbba-4041-a153-163433f2c9ff", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- Employee ID: integer (nullable = true)\n", " |-- Role: string (nullable = true)\n", " |-- Location: string (nullable = true)\n", "\n" ] } ], "source": [ "from pyspark.sql.types import StructType, StructField, IntegerType, StringType\n", "\n", "# Define a schema\n", "schema = StructType([\n", " StructField(\"Employee ID\", IntegerType(), True),\n", " StructField(\"Role\", StringType(), True),\n", " StructField(\"Location\", StringType(), True)\n", "])\n", "\n", "# Read data with an explicit schema\n", "df = spark.read.csv(\"../data/employees.csv\", schema=schema)\n", "df.printSchema()\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "81d081eb-be08-41bd-9f9b-cc240214c662", "metadata": {}, "source": [ "## Semi-Structured Data Processing in PySpark\n", "This section explores PySpark’s capabilities for handling semi-structured data formats, particularly focusing on JSON and XML, and addresses approaches for managing VARIANT-like data, which is commonly used in some SQL databases." ] }, { "attachments": {}, "cell_type": "markdown", "id": "7b00a46d-52f4-47c4-8719-70a45f766634", "metadata": {}, "source": [ "### JSON Processing\n", "JSON is a widely used format in web services and data interchange. PySpark simplifies parsing JSON data into structured DataFrames, making it easy to manipulate and analyze.\n", "\n", "Key Functions\n", "- `from_json()`: Converts JSON strings into a DataFrame column with a structured data type.\n", "- `to_json()`: Converts columns of a DataFrame into JSON strings.\n", "\n", "Example: Parsing JSON Strings" ] }, { "cell_type": "code", "execution_count": 24, "id": "9b06236c-a1f1-479a-8c65-3f252a9a8474", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------+\n", "|parsed_json|\n", "+-----------+\n", "| {John, 30}|\n", "| {Jane, 25}|\n", "+-----------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import from_json, col\n", "from pyspark.sql.types import StructType, StructField, StringType, IntegerType\n", "\n", "json_schema = StructType([\n", " StructField(\"name\", StringType()),\n", " StructField(\"age\", IntegerType())\n", "])\n", "\n", "df = spark.createDataFrame([(\"{\\\"name\\\":\\\"John\\\", \\\"age\\\":30}\",), (\"{\\\"name\\\":\\\"Jane\\\", \\\"age\\\":25}\",)], [\"json_str\"])\n", "df.select(from_json(col(\"json_str\"), json_schema).alias(\"parsed_json\")).show()\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "43152087-b2a8-4bc6-a709-41b27bc2a96a", "metadata": {}, "source": [ "Example: Reading and Processing JSON Data" ] }, { "cell_type": "code", "execution_count": 25, "id": "115a64a3-0ff2-41c6-bc54-2321c9d24203", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------+--------------------+---------+\n", "| author| title| genre|\n", "+-------------+--------------------+---------+\n", "|George Orwell| 1984|Dystopian|\n", "| Jane Austen| Pride and Prejudice| Romance|\n", "| Mark Twain|Adventures of Huc...| Fiction|\n", "+-------------+--------------------+---------+\n", "\n" ] } ], "source": [ "df = spark.read.json('../data/books.json')\n", "df.select(\"author\", \"title\", \"genre\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "165ed955-b461-4524-8837-6485ebe33afe", "metadata": {}, "source": [ "### XML Processing\n", "\n", "
\n", "Note: This section applies to Spark 4.0\n", "
\n", "\n", "XML is another common format for semi-structured data, used extensively in various enterprise applications.\n", "Example: Reading and Processing XML Data" ] }, { "cell_type": "code", "execution_count": 26, "id": "e4651cd3-eba4-4bf2-83ca-e7185130f470", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------+--------------------+---------+\n", "| author| title| genre|\n", "+-------------+--------------------+---------+\n", "|George Orwell| 1984|Dystopian|\n", "| Jane Austen| Pride and Prejudice| Romance|\n", "| Mark Twain|Adventures of Huc...| Fiction|\n", "+-------------+--------------------+---------+\n", "\n" ] } ], "source": [ "df = spark.read \\\n", " .format('xml') \\\n", " .option('rowTag', 'book') \\\n", " .load('../data/books.xml')\n", "df.select(\"author\", \"title\", \"genre\").show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "69f4daf5-fe9a-4399-93d2-abba41152962", "metadata": {}, "source": [ "### Handling VARIANT Data Types in PySpark\n", "\n", "
\n", "Note: This section applies to Spark 4.0\n", "
\n", "\n", "With the introduction of the VARIANT data type, handling semi-structured data has become more streamlined. VARIANT types are designed to store data that doesn't conform to a fixed schema, such as JSON or XML, directly within a DataFrame column.\n", "\n", "Features of VARIANT in PySpark\n", "- **Flexibility**: VARIANT types can store data structures like JSON or XML without predefined schema constraints, offering high flexibility for data ingestion and manipulation.\n", "- **Integration**: Provides better integration with systems that use semi-structured data, allowing for more direct data exchanges and queries.\n", "\n", "Considerations When Using VARIANT\n", "- **Performance**: While VARIANT provides flexibility, it might impact performance due to its dynamic nature. It's important to test and optimize data operations involving VARIANT types.\n", "- **Compatibility**: Ensure that all parts of your data pipeline support VARIANT if you're leveraging this data type, especially when exporting data to external systems.\n", "\n", "Practical Example: Handling JSON Data with VARIANT\n", "This example demonstrates how VARIANT can be used to handle JSON data effectively in PySpark:" ] }, { "cell_type": "code", "execution_count": 27, "id": "f50668e1-ea8a-41bf-9bbf-850dff43f34c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- variant_data: variant (nullable = true)\n", "\n", "+-------------------------------------------------------+\n", "|variant_data |\n", "+-------------------------------------------------------+\n", "|1234567890123456789 |\n", "|12345.6789 |\n", "|\"Hello, World!\" |\n", "|true |\n", "|{\"attributes\":{\"key1\":\"value1\",\"key2\":\"value2\"},\"id\":1}|\n", "|{\"attributes\":{\"key1\":\"value3\",\"key2\":\"value4\"},\"id\":2}|\n", "+-------------------------------------------------------+\n", "\n", "+-------------------+----+------+------+\n", "| long_value| id| key1| key2|\n", "+-------------------+----+------+------+\n", "|1234567890123456789|NULL| NULL| NULL|\n", "| 12345|NULL| NULL| NULL|\n", "| NULL|NULL| NULL| NULL|\n", "| 1|NULL| NULL| NULL|\n", "| NULL| 1|value1|value2|\n", "| NULL| 2|value3|value4|\n", "+-------------------+----+------+------+\n", "\n" ] }, { "data": { "text/plain": [ "[1234567890123456789,\n", " Decimal('12345.6789'),\n", " 'Hello, World!',\n", " True,\n", " {'attributes': {'key1': 'value1', 'key2': 'value2'}, 'id': 1},\n", " {'attributes': {'key1': 'value3', 'key2': 'value4'}, 'id': 2}]" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from datetime import date, datetime\n", "from decimal import Decimal\n", "\n", "from pyspark.sql.functions import try_parse_json, try_variant_get, col\n", "\n", "# Sample JSON data\n", "data = [\n", " '1234567890123456789',\n", " '12345.6789',\n", " '\"Hello, World!\"',\n", " 'true',\n", " '{\"id\": 1, \"attributes\": {\"key1\": \"value1\", \"key2\": \"value2\"}}',\n", " '{\"id\": 2, \"attributes\": {\"key1\": \"value3\", \"key2\": \"value4\"}}',\n", "]\n", "\n", "# Load data into DataFrame with VARIANT\n", "df = spark.createDataFrame(data, StringType()).select(try_parse_json(col(\"value\")).alias(\"variant_data\"))\n", "df.printSchema()\n", "df.show(truncate=False)\n", "\n", "# Accessing elements inside the VARIANT\n", "df.select(\n", " try_variant_get(col(\"variant_data\"), \"$\", \"long\").alias(\"long_value\"),\n", " try_variant_get(col(\"variant_data\"), \"$.id\", \"int\").alias(\"id\"),\n", " try_variant_get(col(\"variant_data\"), \"$.attributes.key1\", \"string\").alias(\"key1\"),\n", " try_variant_get(col(\"variant_data\"), \"$.attributes.key2\", \"string\").alias(\"key2\"),\n", ").show()\n", "\n", "# Collect data and convert to Python objects\n", "[row[\"variant_data\"].toPython() for row in df.collect()]" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }