From 1015db663f395d948c71b94d7ce9a6aa07c55cce Mon Sep 17 00:00:00 2001 From: Tom McKeesick Date: Tue, 19 Sep 2023 07:32:54 +1000 Subject: [PATCH 01/10] create article for python BOMs --- README.md | 5 +++++ .../20230919_parsing_boms_in_python.md | 1 + 2 files changed, 6 insertions(+) create mode 100644 articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md diff --git a/README.md b/README.md index 919157b..046f8ff 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,10 @@ my blog --- +### [20230919 Parsing BOMs in Python](articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md) + +> _How to detect/read/write UTF 8/16 BOMs_ + ### [20230704 Jupyter Cell Wrappers](articles/20230704_jupyter_cell_wrappers/20230704_jupyter_cell_wrappers.md) > _Adding decorator-style functionality to jupyter cells_ @@ -69,3 +73,4 @@ my blog + diff --git a/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md b/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md new file mode 100644 index 0000000..ff5dbaf --- /dev/null +++ b/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md @@ -0,0 +1 @@ +# 20230919 Parsing BOMs in Python From 7a8006838e20028b42bbdcf43334663de71082d2 Mon Sep 17 00:00:00 2001 From: Tom McKeesick Date: Tue, 19 Sep 2023 07:48:01 +1000 Subject: [PATCH 02/10] add code --- .../20230919_parsing_boms_in_python.md | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md b/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md index ff5dbaf..85678f3 100644 --- a/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md +++ b/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md @@ -1 +1,34 @@ # 20230919 Parsing BOMs in Python + +```python + import csv, codecs + + CODECS = { + "utf-8-sig": [codecs.BOM_UTF8], + "utf-16": [ + codecs.BOM_UTF16, + codecs.BOM_UTF16_BE, + codecs.BOM_UTF16_LE, + ] + } + + def detect_encoding(fpath): + with open(fpath, 'rb') as istream: + data = istream.read(3) + for encoding, boms in CODECS.items(): + if any(data.startswith(bom) for bom in boms): + return encoding + return 'utf-8' + + def read(fpath): + with open(fpath, 'r', encoding=detect_encoding(fpath)) as istream: + yield from csv.DictReader(istream) +``` + +```python + # run here + for i, row in enumerate(read('test.csv')): + print(i, row) + if i > 10: + break +``` From b9fd6d74b263d4cc8f4d9aa02aa65a74753ae0d7 Mon Sep 17 00:00:00 2001 From: Tom McKeesick Date: Sun, 28 Jan 2024 21:14:56 +1100 Subject: [PATCH 03/10] update --- .../20230605_pyspark_fu-checkpoint.ipynb | 1610 +++++++++++++++++ .../20230605_pyspark_fu.ipynb | 238 ++- articles/20230605_pyspark_fu/not_there.json | 1 + articles/20230605_pyspark_fu/tmp.json | 3 + .../tmp_out.json/._SUCCESS.crc | Bin 0 -> 8 bytes ...-6eae-4445-9d8e-03f27cf106e9-c000.json.crc | Bin 0 -> 12 bytes .../20230605_pyspark_fu/tmp_out.json/_SUCCESS | 0 ...fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json | 3 + 8 files changed, 1794 insertions(+), 61 deletions(-) create mode 100644 articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb create mode 100644 articles/20230605_pyspark_fu/not_there.json create mode 100644 articles/20230605_pyspark_fu/tmp.json create mode 100644 articles/20230605_pyspark_fu/tmp_out.json/._SUCCESS.crc create mode 100644 articles/20230605_pyspark_fu/tmp_out.json/.part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json.crc create mode 100644 articles/20230605_pyspark_fu/tmp_out.json/_SUCCESS create mode 100644 articles/20230605_pyspark_fu/tmp_out.json/part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json diff --git a/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb b/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb new file mode 100644 index 0000000..902d3a7 --- /dev/null +++ b/articles/20230605_pyspark_fu/.ipynb_checkpoints/20230605_pyspark_fu-checkpoint.ipynb @@ -0,0 +1,1610 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "1a22b5d9", + "metadata": {}, + "source": [ + "# Pyspark Fu\n", + "\n", + "- [1. Initialising the Spark Session](#1-initialising-the-spark-session)\n", + "- [2. Create a simple dataframe for debugging](#2-create-a-simple-dataframe-for-debugging)\n", + "- [3. Joins](#3-joins)\n", + " - [3.1. Avoid duplicate column names](#31-avoid-duplicate-column-names)\n", + " - [3.1.2 Join using list of names](#312-join-using-list-of-names)\n", + " - [3.1.3 Dataframe aliasing is a bit weird](#313-dataframe-aliasing-is-a-bit-weird)\n", + "- [4. The Schema format](#4-the-schema-format)\n", + " - [4.1 Simple Schema Utility Class](#41-simple-schema-utility-class)\n", + " - [4.2 Schema Inference Debacles](#42-schema-inference-debacles)\n" + ] + }, + { + "cell_type": "markdown", + "id": "6e2e9929", + "metadata": {}, + "source": [ + "## 1. Initialising the Spark Session" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "938cae9a-282a-47ea-9828-0d082d94774c", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.conf import SparkConf\n", + "from pyspark.sql import SparkSession\n", + "\n", + "CONF = {\n", + " 'spark.ui.showConsoleProgress': 'false',\n", + " 'spark.ui.dagGraph.retainedRootRDDs': '1',\n", + " 'spark.ui.retainedJobs': '1',\n", + " 'spark.ui.retainedStages': '1',\n", + " 'spark.ui.retainedTasks': '1',\n", + " 'spark.sql.ui.retainedExecutions': '1',\n", + " 'spark.worker.ui.retainedExecutors': '1',\n", + " 'spark.worker.ui.retainedDrivers': '1',\n", + " 'spark.executor.instances': '1',\n", + "}\n", + "\n", + "def spark_session() -> SparkSession:\n", + " '''\n", + " - set a bunch of spark config variables that help lighten the load\n", + " - local[1] locks the spark runtime to a single core\n", + " - silence noisy warning logs\n", + " '''\n", + " conf = SparkConf().setAll([(k,v) for k,v in CONF.items()])\n", + "\n", + " sc = SparkSession.builder.master('local[1]').config(conf=conf).getOrCreate()\n", + " sc.sparkContext.setLogLevel('ERROR')\n", + " return sc" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "5a9afc91-dafc-4e2c-98f4-ecc8e3b876ce", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Setting default log level to \"WARN\".\n", + "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", + "23/08/30 18:40:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" + ] + } + ], + "source": [ + "spark = spark_session()" + ] + }, + { + "cell_type": "markdown", + "id": "e7dbdeb5", + "metadata": {}, + "source": [ + "## 2. Create a simple dataframe for debugging\n", + "\n", + "\n", + "- The pyspark official docs don't often \"create\" the dataframe that the code examples refer to" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "d4a27d70-e893-4810-92a6-7ffa43b11c15", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------------------------+\n", + "|a |n |\n", + "+---+---------------------------+\n", + "|b |{a -> b} |\n", + "|c |{y -> b, z -> x} |\n", + "|d |{2 -> 3, t -> a, o -> null}|\n", + "+---+---------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.createDataFrame([\n", + " {'a': 'b', 'n': {'a': 'b'}},\n", + " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + "])\n", + "\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "05f0f0b4-a002-4947-b340-cb38912be8aa", + "metadata": {}, + "source": [ + "## 3. Joins\n", + "\n", + "### 3.1. Avoid duplicate column names" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "54888af5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+\n", + "| id| name|\n", + "+---+---------+\n", + "|123| pikachu|\n", + "|999| evee|\n", + "|007|charizard|\n", + "+---+---------+\n", + "\n", + "+---+-----+\n", + "| id| name|\n", + "+---+-----+\n", + "|123| ash|\n", + "|999|chloe|\n", + "|007| ash|\n", + "+---+-----+\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "(None, None)" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Let's construct two dataframes that share a column to join on\n", + "\n", + "df1 = spark.createDataFrame([\n", + " {'id': '123', 'name': 'pikachu'},\n", + " {'id': '999', 'name': 'evee'},\n", + " {'id': '007', 'name': 'charizard'},\n", + "])\n", + "df2 = spark.createDataFrame([\n", + " {'id': '123', 'name': 'ash'},\n", + " {'id': '999', 'name': 'chloe'},\n", + " {'id': '007', 'name': 'ash'},\n", + "])\n", + "\n", + "df1.show(), df2.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "6eb73a7d", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+---+-----+\n", + "| id| name| id| name|\n", + "+---+---------+---+-----+\n", + "|007|charizard|007| ash|\n", + "|123| pikachu|123| ash|\n", + "|999| evee|999|chloe|\n", + "+---+---------+---+-----+\n", + "\n" + ] + } + ], + "source": [ + "# Now, lets join them together into a combined pokemon-and-trainer table\n", + "joined = df1.join(\n", + " df2,\n", + " on=df1['id'] == df2['id'],\n", + " how='inner',\n", + ")\n", + "joined.show()" + ] + }, + { + "cell_type": "markdown", + "id": "af898b40", + "metadata": {}, + "source": [ + "This _seems_ fine initially, but spark blows up as soon as you try and use the 'id' column in an expression\n", + "\n", + "This example will produce the error:\n", + "\n", + "`[AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].`\n", + "\n", + "This can be particularly annoying as the error will only appear when you attempt to use the columns, but will go undetected if this doesn't happen" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "5cab4eb2", + "metadata": {}, + "outputs": [], + "source": [ + "import pyspark.sql.utils\n", + "from pyspark.sql import DataFrame\n", + "from typing import List\n", + "\n", + "def try_select(df: DataFrame, cols: List[str]):\n", + " try:\n", + " df.select(*cols).show()\n", + "\n", + " except pyspark.sql.utils.AnalysisException as e:\n", + " print('select failed!', e)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "34f0c2ac", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "select failed! [AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`id`, `id`].\n" + ] + } + ], + "source": [ + "try_select(joined, ['id', 'name', 'trainer'])" + ] + }, + { + "cell_type": "markdown", + "id": "012d4744", + "metadata": {}, + "source": [ + "The solution: use a different parameter for the `on` columns\n", + "\n", + "### 3.1.2 Join using list of names" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "c0bc54b2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+-----+\n", + "| id| name| name|\n", + "+---+---------+-----+\n", + "|007|charizard| ash|\n", + "|123| pikachu| ash|\n", + "|999| evee|chloe|\n", + "+---+---------+-----+\n", + "\n", + "select failed! [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`name`, `name`].\n" + ] + } + ], + "source": [ + "joined = df1.join(\n", + " df2,\n", + " on=['id'],\n", + " how='inner',\n", + ")\n", + "joined.show()\n", + "\n", + "# Now let's try that same select again\n", + "try_select(joined, ['id', 'name', 'trainer'])" + ] + }, + { + "cell_type": "markdown", + "id": "414bf5ac", + "metadata": {}, + "source": [ + "### 3.1.3 Dataframe aliasing is a bit weird" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "8b46a846", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+\n", + "| id| name|\n", + "+---+---------+\n", + "|123| pikachu|\n", + "|999| evee|\n", + "|007|charizard|\n", + "+---+---------+\n", + "\n" + ] + } + ], + "source": [ + "df1.alias('pokemon').select('*').show()" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "ccae01f4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------+---+-----+\n", + "| id| name| id| name|\n", + "+---+---------+---+-----+\n", + "|007|charizard|007| ash|\n", + "|123| pikachu|123| ash|\n", + "|999| evee|999|chloe|\n", + "+---+---------+---+-----+\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "['id', 'name', 'id', 'name']" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import pyspark.sql.functions as F\n", + "\n", + "joined = df1.alias('pokemon').join(\n", + " df2.alias('trainers'),\n", + " on=F.col('pokemon.id') == F.col('trainers.id'),\n", + " how='inner',\n", + ")\n", + "joined.show()\n", + "joined.columns" + ] + }, + { + "cell_type": "markdown", + "id": "19620ae6", + "metadata": {}, + "source": [ + "Now, our error message is much better, as it contains the dataframe aliases identifying which table the duplicate column name is from" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "0d0a82b2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "select failed! [AMBIGUOUS_REFERENCE] Reference `id` is ambiguous, could be: [`pokemon`.`id`, `trainers`.`id`].\n" + ] + } + ], + "source": [ + "try_select(joined, ['id'])" + ] + }, + { + "cell_type": "markdown", + "id": "6d393943", + "metadata": {}, + "source": [ + "Confusingly, using `Dataframe.columns` does not show the aliases, but they are usable when selecting" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "3be334bf", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['id', 'name', 'id', 'name']\n", + "+---+\n", + "| id|\n", + "+---+\n", + "|007|\n", + "|123|\n", + "|999|\n", + "+---+\n", + "\n" + ] + } + ], + "source": [ + "print(joined.columns)\n", + "\n", + "try_select(joined, ['pokemon.id'])" + ] + }, + { + "cell_type": "markdown", + "id": "ffc67166", + "metadata": {}, + "source": [ + "## 4. The Schema format" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "f6fd3f01", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------------------------+\n", + "|a |n |\n", + "+---+---------------------------+\n", + "|b |{a -> b} |\n", + "|c |{y -> b, z -> x} |\n", + "|d |{2 -> 3, t -> a, o -> null}|\n", + "+---+---------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.createDataFrame([\n", + " {'a': 'b', 'n': {'a': 'b'}},\n", + " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + "])\n", + "\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "ed1c6c0f", + "metadata": {}, + "source": [ + "Every dataframe has a schema attached, which is a nested object using `StructType` as its root" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "0602fafb", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "StructType([StructField('a', StringType(), True), StructField('n', MapType(StringType(), StringType(), True), True)])" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.schema" + ] + }, + { + "cell_type": "markdown", + "id": "685d28e1", + "metadata": {}, + "source": [ + "This form is fine to use, but can't really stored inside a config file, or passed between systems as parameters. In order to do this, its possible to convert a pyspark schema `StructType` object to JSON, and back again" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "47eacbe0", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"fields\":[{\"metadata\":{},\"name\":\"a\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"n\",\"nullable\":true,\"type\":{\"keyType\":\"string\",\"type\":\"map\",\"valueContainsNull\":true,\"valueType\":\"string\"}}],\"type\":\"struct\"}\n" + ] + } + ], + "source": [ + "# Convert schema to JSON string\n", + "json_string = df.schema.json()\n", + "print(json_string)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "a364e739", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "StructType([StructField('a', StringType(), True), StructField('n', MapType(StringType(), StringType(), True), True)])" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Convert JSON string to schema\n", + "from pyspark.sql import types as T\n", + "import json\n", + "\n", + "T.StructType.fromJson(json.loads(json_string))" + ] + }, + { + "cell_type": "markdown", + "id": "4baa1603", + "metadata": {}, + "source": [ + "> _You may notice that although the method is called `fromJson`, the method actually accepts a `dictionary`, not a JSON string!_" + ] + }, + { + "cell_type": "markdown", + "id": "8e7abd85", + "metadata": {}, + "source": [ + "### 4.1 Simple Schema Utility Class\n", + "\n", + "Bundling a few helper methods into a utility class is very handy when dealing with pyspark schemas!\n", + "\n", + "Generally speaking, you'll often need to\n", + "\n", + "- Grab a schema from an existing dataframe, to inspect or store\n", + "- Convert that schema to JSON, so that it can be stored or passed around easily\n", + "- Create a pyspark schema from input JSON\n", + "- Print the schema in a human-readable form (multi-line JSON works best)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "50006bd4", + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "import json\n", + "\n", + "from pyspark.sql import DataFrame\n", + "\n", + "@dataclass\n", + "class Schema:\n", + " schema: T.StructType\n", + " \n", + " def from_json(j): return Schema(T.StructType.fromJson(json.loads(j)))\n", + " def from_df(df): return Schema(df.schema)\n", + "\n", + " def as_json(self): return self.schema.json()\n", + " def as_dict(self): return json.loads(self.as_json())\n", + " def show(self): print(json.dumps(self.as_dict(), indent=2))" + ] + }, + { + "cell_type": "markdown", + "id": "a4c6092e", + "metadata": {}, + "source": [ + "> This class is instantiated with a `df.schema`, however it's a good idea to provide some static methods like `from_df` and `from_json` so that it's flexible" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "10f2516f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"n\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"string\"\n", + " }\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema.from_df(df).show()" + ] + }, + { + "cell_type": "markdown", + "id": "289aa5fb", + "metadata": {}, + "source": [ + "### 4.2 Schema Inference Debacles\n", + "\n", + "I've found that pyspark will infer different schemas depending on how the DataFrame is initialised, namely when\n", + "\n", + "- using `spark.read.json`\n", + "- using `spark.createDataFrame`" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "69daf2e4", + "metadata": {}, + "outputs": [], + "source": [ + "row = {'id': 123, 'key': 'yolo', 'attrs': {'a': 1}}" + ] + }, + { + "cell_type": "markdown", + "id": "76af69f1", + "metadata": {}, + "source": [ + "#### 4.2.1 Inferring schema when reading JSON" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "495ce8a7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "import json\n", + "\n", + "# write the row to a file\n", + "json.dump(row, open('not_there.json', 'w'))\n", + "\n", + "Schema(spark.read.json('not_there.json').schema).show()" + ] + }, + { + "cell_type": "markdown", + "id": "2593458e", + "metadata": {}, + "source": [ + "Here, `attrs` has been detected as a `struct` object, which contains a field `a` of type `long`" + ] + }, + { + "cell_type": "markdown", + "id": "7577e2d0", + "metadata": {}, + "source": [ + "#### 4.2.2 Inferring schema when using createDataFrame\n", + "\n", + "Here, `attrs` has been detected as a `map` object, which contains string keys, with a value of type `long`" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "ad644671", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"long\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema.from_df(\n", + " spark.createDataFrame([row])\n", + ").show()" + ] + }, + { + "cell_type": "markdown", + "id": "e30ac195", + "metadata": {}, + "source": [ + "#### 4.2.3 Bonus\n", + "\n", + "To carry the madness forward, it's possible to force either `spark.read.json` or `spark.createDataFrame` to use the schema produced by the other, seemingly without consequences (that I've found so far!)" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "a37183b0", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "# Use the inferred schema from reading the file with createDataFrame\n", + "\n", + "Schema.from_df(\n", + " spark.createDataFrame([row], schema=spark.read.json('not_there.json').schema)\n", + ").show()" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "5e9dec7a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"long\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "# Use the inferred schema from createDataFrame when reading the file\n", + "\n", + "Schema.from_df(\n", + " spark.read.json('not_there.json', schema=spark.createDataFrame([row]).schema)\n", + ").show()" + ] + }, + { + "cell_type": "markdown", + "id": "7e8bdf9d-87c3-4347-9712-e49a22fea92c", + "metadata": {}, + "source": [ + "## 5. Default empty DataFrames\n", + "\n", + "Sometimes it's handy to be able to instantiate an \"empty\" dataframe in the case that a file/some source data is missing" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "f70151d6-b305-45ff-a680-81f3a7d0c3a7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[PATH_NOT_FOUND] Path does not exist: file:/Users/tomm/dev/tmck-code.github.io/articles/20230605_pyspark_fu/optional_source.json.\n" + ] + } + ], + "source": [ + "# This will result in an AnalysisException complaining that \n", + "# the file did not exist\n", + "from pyspark.errors.exceptions.captured import AnalysisException\n", + "\n", + "try:\n", + " spark.read.json('optional_source.json')\n", + "except AnalysisException as e:\n", + " print(e)" + ] + }, + { + "cell_type": "markdown", + "id": "e673e8c9-9bf9-47a1-9926-d3d94fdfcb57", + "metadata": {}, + "source": [ + "We can mitigate this by catching the exception, and creating a dataframe that matches the schema, but has 0 rows.\n", + "\n", + "This ensures that any queries on the dataframe will still work, as all the columns will exist with the correct type.\n", + "\n", + "_**This requires that we know the schema of the optional file**_\n", + "\n", + "\n", + "The easiest way to create a schema is usually to create a single-line file containing a valid line that matches the expected schema. Then, read that file into a dataframe and capture the schema for re-use (read: copy/paste)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "6d76f75b-ebe8-47d4-a14b-67ade411e5ef", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "import json\n", + "\n", + "with open('not_there.json', 'w') as ostream:\n", + " ostream.write(json.dumps({\n", + " 'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}\n", + " }))\n", + "\n", + "Schema(spark.read.json('not_there.json').schema).show()" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "d523bafc", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"keyType\": \"string\",\n", + " \"type\": \"map\",\n", + " \"valueContainsNull\": true,\n", + " \"valueType\": \"string\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema.from_df(spark.createDataFrame([\n", + " {'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}}\n", + "])).show()" + ] + }, + { + "cell_type": "markdown", + "id": "bf55be1d-bf2b-4368-87c0-2a28974dc262", + "metadata": {}, + "source": [ + "I've never found a way (using StringIO or similar) to achieve this without writing a file - if you find a way then let me know!\n", + "\n", + "Let's bundle this up into a method that tidies up after itself:" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "99aca1da-147e-4ab4-ada4-c2642c066830", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import os\n", + "\n", + "def guess_schema(row: dict, tmp_fpath: str = 'tmp.json') -> dict:\n", + " with open(tmp_fpath, 'w') as ostream:\n", + " ostream.write(json.dumps({\n", + " 'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}\n", + " })) \n", + " schema = json.loads(spark.read.json('not_there.json').schema.json())\n", + " os.remove(tmp_fpath)\n", + "\n", + " return schema" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "fb184808-7c85-4506-ab9d-f7f2849eee70", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"attrs\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"id\",\n", + " \"nullable\": true,\n", + " \"type\": \"long\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"key\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "schema = guess_schema(\n", + " {'id': 123, 'key': 'yolo', 'attrs': {'a': 'b'}}\n", + ")\n", + "print(json.dumps(schema, indent=2))" + ] + }, + { + "cell_type": "markdown", + "id": "45033389-3134-42cb-aafd-2a6b38161cea", + "metadata": {}, + "source": [ + "As you can see from this quick demo, it isn't quick to craft pyspark schemas from hand! In my experience it's prone to much human error and frustrating debugging, especially as schemas can grow large very quickly!\n", + "\n", + "Now, we can tie this into the method to safely load/create a dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "05059828-375c-4a63-b8b4-766bcbf6568d", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.errors.exceptions.captured import AnalysisException\n", + "import pyspark.sql.types as T\n", + "\n", + "def safe_load(fpath: str, schema: dict):\n", + " try:\n", + " return spark.read.json(fpath)\n", + " except AnalysisException as e:\n", + " print(e)\n", + " return spark.createDataFrame([], schema=T.StructType.fromJson(schema))" + ] + }, + { + "cell_type": "markdown", + "id": "838b9778-c361-49d0-a111-b04858069fc6", + "metadata": {}, + "source": [ + "> Side note: the method to convert a dict to a StructType (schema) is confusingly named `fromJson` despite the fact that the method accepts a dict, not a JSON string" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "1a81a746-f9b6-4ed7-b34f-8e0c0375b511", + "metadata": {}, + "outputs": [], + "source": [ + "df = safe_load('not_there.json', schema)" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "1649aefc-7d18-48fe-9022-cbd0296bda5e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+---+----+\n", + "|attrs| id| key|\n", + "+-----+---+----+\n", + "| {b}|123|yolo|\n", + "+-----+---+----+\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "ce755e33-0f8e-4fd8-8e52-d6650fd23f78", + "metadata": {}, + "source": [ + "After the initial generation, the schema can be stored in a file and loaded or just defined directly in the code, rather than \"guessed\" every time" + ] + }, + { + "cell_type": "markdown", + "id": "94b81a00", + "metadata": {}, + "source": [ + "## 6. Generating JSON output with dynamic keys" + ] + }, + { + "cell_type": "markdown", + "id": "707a4444", + "metadata": {}, + "source": [ + "To demonstrate the problem, we will\n", + "1. read in a JSON file matching the dataframe above, with a few different nested types (e.g. strings, numbers, and null)\n", + "2. remove any key/values pairs with null values\n", + "3. write a JSON file where all nested types match the input" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "ac71e1c5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+---+----+\n", + "|attrs|id |key |\n", + "+-----+---+----+\n", + "|{b} |123|yolo|\n", + "+-----+---+----+\n", + "\n" + ] + } + ], + "source": [ + "example_df = spark.createDataFrame([\n", + " {'a': 'b', 'n': {'a': 'b'}},\n", + " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + "])\n", + "\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "629ec980", + "metadata": {}, + "outputs": [], + "source": [ + "data = [\n", + " {'a': 'b', 'n': {'a': True, 'z': ['1', 7, True]}},\n", + " {'a': 'c', 'n': {'a': 'b', 'z': 'x', 't': None}},\n", + " {'a': 'd', 'n': {'a': 3, 'z': None}},\n", + "]\n", + "print('\\n'.join(json.dumps(r) for r in data), file=open('tmp.json', 'w'))" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "f2e5e5de", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"a\": \"b\", \"n\": {\"a\": true, \"z\": [\"1\", 7, true]}}\n", + "{\"a\": \"c\", \"n\": {\"a\": \"b\", \"z\": \"x\", \"t\": null}}\n", + "{\"a\": \"d\", \"n\": {\"a\": 3, \"z\": null}}\n" + ] + } + ], + "source": [ + "cat tmp.json" + ] + }, + { + "cell_type": "markdown", + "id": "bc13635c", + "metadata": {}, + "source": [ + "Now, to read the file with spark" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "d28ed31f", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+--------------------------+\n", + "|a |n |\n", + "+---+--------------------------+\n", + "|b |{true, null, [\"1\",7,true]}|\n", + "|c |{b, null, x} |\n", + "|d |{3, null, null} |\n", + "+---+--------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.read.json('tmp.json')\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "06bd8002", + "metadata": {}, + "source": [ + "This looks a little weird! This is due to the fact that spark.createDataFrame and spark.read.json can be given an identical input table and infer different schemas" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "bfa038b5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"n\",\n", + " \"nullable\": true,\n", + " \"type\": {\n", + " \"fields\": [\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"a\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"t\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " },\n", + " {\n", + " \"metadata\": {},\n", + " \"name\": \"z\",\n", + " \"nullable\": true,\n", + " \"type\": \"string\"\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + " }\n", + " }\n", + " ],\n", + " \"type\": \"struct\"\n", + "}\n" + ] + } + ], + "source": [ + "Schema(df.schema).show()" + ] + }, + { + "cell_type": "markdown", + "id": "fcfa1d9b", + "metadata": {}, + "source": [ + "You might notice that spark has inferred all " + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "id": "e48f5b25", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+------------------------------+\n", + "|a |n |\n", + "+---+------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true]}|\n", + "|c |{a -> b, z -> x, t -> null} |\n", + "|d |{a -> 3, z -> null} |\n", + "+---+------------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.read.json('tmp.json', schema=example_df.schema)\n", + "df.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "id": "501cd405", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+------------------------------+\n", + "|a |n |\n", + "+---+------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3} |\n", + "+---+------------------------------+\n", + "\n" + ] + } + ], + "source": [ + "import pyspark.sql.functions as F\n", + "\n", + "df_filtered = df.select(\n", + " F.col('a'),\n", + " F.map_filter(F.col('n'), lambda k,v: v.isNotNull()).alias('n')\n", + ")\n", + "df_filtered.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "f8dcd270", + "metadata": {}, + "source": [ + "The performance impact of using a python UDF can be mitigated by using a pure SQL statement" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "id": "24efbc9c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3} |\n", + "+---+------------------------------+\n", + "\n" + ] + } + ], + "source": [ + "df.createOrReplaceTempView(\"df\")\n", + "df_filtered = spark.sql(\n", + " \"select a, map_filter(n, (k,v) -> v is not null) custom from df\"\n", + ")\n", + "df_filtered.show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "049d7363", + "metadata": {}, + "outputs": [], + "source": [ + "df_filtered.write.format('json').mode('overwrite').save('tmp_out.json')" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "id": "665c9b31", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true]\"}}\n", + "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\"}}\n", + "{\"a\":\"d\",\"custom\":{\"a\":\"3\"}}\n" + ] + } + ], + "source": [ + "cat tmp_out.json/part*" + ] + }, + { + "cell_type": "markdown", + "id": "9b8d80d5", + "metadata": {}, + "source": [ + "As we can see here, this is very close! All of the types have been _somewhat_ preserved:\n", + "\n", + "- All strings are still strings\n", + "- All other types are JSON-encoded strings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01b26da2", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb b/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb index 902d3a7..13e729d 100644 --- a/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb +++ b/articles/20230605_pyspark_fu/20230605_pyspark_fu.ipynb @@ -73,7 +73,7 @@ "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", - "23/08/30 18:40:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" + "23/11/22 11:24:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] } ], @@ -1270,7 +1270,7 @@ }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 147, "id": "ac71e1c5", "metadata": {}, "outputs": [ @@ -1278,43 +1278,45 @@ "name": "stdout", "output_type": "stream", "text": [ - "+-----+---+----+\n", - "|attrs|id |key |\n", - "+-----+---+----+\n", - "|{b} |123|yolo|\n", - "+-----+---+----+\n", + "+---+---------------------------+\n", + "|a |custom |\n", + "+---+---------------------------+\n", + "|b |{a -> b} |\n", + "|c |{y -> b, z -> x} |\n", + "|d |{2 -> 3, t -> a, o -> null}|\n", + "+---+---------------------------+\n", "\n" ] } ], "source": [ "example_df = spark.createDataFrame([\n", - " {'a': 'b', 'n': {'a': 'b'}},\n", - " {'a': 'c', 'n': {'z': 'x', 'y': 'b'}},\n", - " {'a': 'd', 'n': {'o': None, 't': 'a', '2': 3}}\n", + " {'a': 'b', 'custom': {'a': 'b'}},\n", + " {'a': 'c', 'custom': {'z': 'x', 'y': 'b'}},\n", + " {'a': 'd', 'custom': {'o': None, 't': 'a', '2': 3}}\n", "])\n", "\n", - "df.show(truncate=False)" + "example_df.show(truncate=False)" ] }, { "cell_type": "code", - "execution_count": 33, + "execution_count": 148, "id": "629ec980", "metadata": {}, "outputs": [], "source": [ "data = [\n", - " {'a': 'b', 'n': {'a': True, 'z': ['1', 7, True]}},\n", - " {'a': 'c', 'n': {'a': 'b', 'z': 'x', 't': None}},\n", - " {'a': 'd', 'n': {'a': 3, 'z': None}},\n", + " {'a': 'b', 'custom': {'a': True, 'z': ['1', 7, True, [1, 2]]}},\n", + " {'a': 'c', 'custom': {'a': 'b', 'z': 'x', 't': None}},\n", + " {'a': 'd', 'custom': {'a': 3, 'z': [True]}},\n", "]\n", "print('\\n'.join(json.dumps(r) for r in data), file=open('tmp.json', 'w'))" ] }, { "cell_type": "code", - "execution_count": 34, + "execution_count": 149, "id": "f2e5e5de", "metadata": {}, "outputs": [ @@ -1322,9 +1324,9 @@ "name": "stdout", "output_type": "stream", "text": [ - "{\"a\": \"b\", \"n\": {\"a\": true, \"z\": [\"1\", 7, true]}}\n", - "{\"a\": \"c\", \"n\": {\"a\": \"b\", \"z\": \"x\", \"t\": null}}\n", - "{\"a\": \"d\", \"n\": {\"a\": 3, \"z\": null}}\n" + "{\"a\": \"b\", \"custom\": {\"a\": true, \"z\": [\"1\", 7, true, [1, 2]]}}\n", + "{\"a\": \"c\", \"custom\": {\"a\": \"b\", \"z\": \"x\", \"t\": null}}\n", + "{\"a\": \"d\", \"custom\": {\"a\": 3, \"z\": [true]}}\n" ] } ], @@ -1342,7 +1344,7 @@ }, { "cell_type": "code", - "execution_count": 35, + "execution_count": 151, "id": "d28ed31f", "metadata": { "scrolled": true @@ -1352,13 +1354,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+--------------------------+\n", - "|a |n |\n", - "+---+--------------------------+\n", - "|b |{true, null, [\"1\",7,true]}|\n", - "|c |{b, null, x} |\n", - "|d |{3, null, null} |\n", - "+---+--------------------------+\n", + "+---+--------------------------------+\n", + "|a |custom |\n", + "+---+--------------------------------+\n", + "|b |{true, null, [\"1\",7,true,[1,2]]}|\n", + "|c |{b, null, x} |\n", + "|d |{3, null, [true]} |\n", + "+---+--------------------------------+\n", "\n" ] } @@ -1378,7 +1380,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 152, "id": "bfa038b5", "metadata": {}, "outputs": [ @@ -1396,7 +1398,7 @@ " },\n", " {\n", " \"metadata\": {},\n", - " \"name\": \"n\",\n", + " \"name\": \"custom\",\n", " \"nullable\": true,\n", " \"type\": {\n", " \"fields\": [\n", @@ -1432,6 +1434,14 @@ "Schema(df.schema).show()" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "8864f513-7b56-4367-bfd7-3d542a85f712", + "metadata": {}, + "outputs": [], + "source": [] + }, { "cell_type": "markdown", "id": "fcfa1d9b", @@ -1442,7 +1452,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 153, "id": "e48f5b25", "metadata": { "scrolled": true @@ -1452,13 +1462,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+------------------------------+\n", - "|a |n |\n", - "+---+------------------------------+\n", - "|b |{a -> true, z -> [\"1\",7,true]}|\n", - "|c |{a -> b, z -> x, t -> null} |\n", - "|d |{a -> 3, z -> null} |\n", - "+---+------------------------------+\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x, t -> null} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", "\n" ] } @@ -1470,7 +1480,7 @@ }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 137, "id": "501cd405", "metadata": {}, "outputs": [ @@ -1478,13 +1488,21 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+------------------------------+\n", - "|a |n |\n", - "+---+------------------------------+\n", - "|b |{a -> true, z -> [\"1\",7,true]}|\n", - "|c |{a -> b, z -> x} |\n", - "|d |{a -> 3} |\n", - "+---+------------------------------+\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x, t -> null} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", + "\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", "\n" ] } @@ -1492,9 +1510,10 @@ "source": [ "import pyspark.sql.functions as F\n", "\n", + "df.show(truncate=False)\n", "df_filtered = df.select(\n", " F.col('a'),\n", - " F.map_filter(F.col('n'), lambda k,v: v.isNotNull()).alias('n')\n", + " F.map_filter(F.col('custom'), lambda k,v: v.isNotNull()).alias('custom')\n", ")\n", "df_filtered.show(truncate=False)" ] @@ -1509,7 +1528,7 @@ }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 138, "id": "24efbc9c", "metadata": {}, "outputs": [ @@ -1517,13 +1536,13 @@ "name": "stdout", "output_type": "stream", "text": [ - "+---+------------------------------+\n", - "|a |custom |\n", - "+---+------------------------------+\n", - "|b |{a -> true, z -> [\"1\",7,true]}|\n", - "|c |{a -> b, z -> x} |\n", - "|d |{a -> 3} |\n", - "+---+------------------------------+\n", + "+---+------------------------------------+\n", + "|a |custom |\n", + "+---+------------------------------------+\n", + "|b |{a -> true, z -> [\"1\",7,true,[1,2]]}|\n", + "|c |{a -> b, z -> x} |\n", + "|d |{a -> 3, z -> [true]} |\n", + "+---+------------------------------------+\n", "\n" ] } @@ -1531,24 +1550,40 @@ "source": [ "df.createOrReplaceTempView(\"df\")\n", "df_filtered = spark.sql(\n", - " \"select a, map_filter(n, (k,v) -> v is not null) custom from df\"\n", + " \"select a, map_filter(custom, (k,v) -> v is not null) custom from df\"\n", ")\n", "df_filtered.show(truncate=False)" ] }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 139, "id": "049d7363", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true,[1,2]]\"}}\n", + "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\"}}\n", + "{\"a\":\"d\",\"custom\":{\"a\":\"3\",\"z\":\"true\"}}\n", + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true,[1,2]]\"}}\n", + "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\",\"t\":null}}\n", + "{\"a\":\"d\",\"custom\":{\"a\":\"3\",\"z\":\"[true]\"}}\n" + ] + } + ], "source": [ + "! cat tmp_out.json/part*\n", + "df.write.format('json').mode('overwrite').save('tmp_out.json')\n", + "! cat tmp_out.json/part*\n", "df_filtered.write.format('json').mode('overwrite').save('tmp_out.json')" ] }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 140, "id": "665c9b31", "metadata": {}, "outputs": [ @@ -1556,9 +1591,9 @@ "name": "stdout", "output_type": "stream", "text": [ - "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true]\"}}\n", + "{\"a\":\"b\",\"custom\":{\"a\":\"true\",\"z\":\"[\\\"1\\\",7,true,[1,2]]\"}}\n", "{\"a\":\"c\",\"custom\":{\"a\":\"b\",\"z\":\"x\"}}\n", - "{\"a\":\"d\",\"custom\":{\"a\":\"3\"}}\n" + "{\"a\":\"d\",\"custom\":{\"a\":\"3\",\"z\":\"[true]\"}}\n" ] } ], @@ -1579,9 +1614,90 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 143, "id": "01b26da2", "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---------------------------------+\n", + "|a |custom |\n", + "+---+---------------------------------+\n", + "|b |{\"a\":true,\"z\":[\"1\",7,true,[1,2]]}|\n", + "|c |{\"a\":\"b\",\"z\":\"x\",\"t\":null} |\n", + "|d |{\"a\":3,\"z\":[true]} |\n", + "+---+---------------------------------+\n", + "\n", + "StructType([StructField('a', StringType(), True), StructField('custom', StringType(), True)])\n" + ] + }, + { + "data": { + "text/plain": [ + "[Row(from_json(custom)=Row(z=['1', '7', 'true', '[1,2]'])),\n", + " Row(from_json(custom)=Row(z=None)),\n", + " Row(from_json(custom)=Row(z=['true']))]" + ] + }, + "execution_count": 143, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from pyspark.sql import functions as F\n", + "from pyspark.sql import types as T\n", + "df = spark.read.json('tmp.json', schema=T.StructType(\n", + " [\n", + " T.StructField(\n", + " name='a', dataType = T.StringType()\n", + " ),\n", + " T.StructField(\n", + " name='custom',\n", + " dataType=T.StringType()\n", + " )\n", + " ]\n", + "))\n", + "df.show(truncate=False)\n", + "print(df.schema)\n", + "df.select(\n", + " F.from_json(\n", + " F.col('custom'),\n", + " schema=T.StructType(\n", + " [\n", + " T.StructField(\n", + " name='z',\n", + " dataType=T.ArrayType(T.BooleanType())\n", + " )\n", + " ]\n", + " )\n", + " )\n", + ").collect()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc822e9d-65b0-477d-8f1a-016bbcaf374f", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "86583c85-dce3-428e-9641-468346d9ed21", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "55ec7c51-98c1-4874-ab21-ff73f580720a", + "metadata": {}, "outputs": [], "source": [] } diff --git a/articles/20230605_pyspark_fu/not_there.json b/articles/20230605_pyspark_fu/not_there.json new file mode 100644 index 0000000..135ba47 --- /dev/null +++ b/articles/20230605_pyspark_fu/not_there.json @@ -0,0 +1 @@ +{"id": 123, "key": "yolo", "attrs": {"a": "b"}} \ No newline at end of file diff --git a/articles/20230605_pyspark_fu/tmp.json b/articles/20230605_pyspark_fu/tmp.json new file mode 100644 index 0000000..f999fd5 --- /dev/null +++ b/articles/20230605_pyspark_fu/tmp.json @@ -0,0 +1,3 @@ +{"a": "b", "custom": {"a": true, "z": ["1", 7, true, [1, 2]]}} +{"a": "c", "custom": {"a": "b", "z": "x", "t": null}} +{"a": "d", "custom": {"a": 3, "z": [true]}} diff --git a/articles/20230605_pyspark_fu/tmp_out.json/._SUCCESS.crc b/articles/20230605_pyspark_fu/tmp_out.json/._SUCCESS.crc new file mode 100644 index 0000000000000000000000000000000000000000..3b7b044936a890cd8d651d349a752d819d71d22c GIT binary patch literal 8 PcmYc;N@ieSU}69O2$TUk literal 0 HcmV?d00001 diff --git a/articles/20230605_pyspark_fu/tmp_out.json/.part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json.crc b/articles/20230605_pyspark_fu/tmp_out.json/.part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json.crc new file mode 100644 index 0000000000000000000000000000000000000000..610dd23da1c3949714879c69142d08e965940109 GIT binary patch literal 12 TcmYc;N@ieSU}BKtF>L_=4+;Vv literal 0 HcmV?d00001 diff --git a/articles/20230605_pyspark_fu/tmp_out.json/_SUCCESS b/articles/20230605_pyspark_fu/tmp_out.json/_SUCCESS new file mode 100644 index 0000000..e69de29 diff --git a/articles/20230605_pyspark_fu/tmp_out.json/part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json b/articles/20230605_pyspark_fu/tmp_out.json/part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json new file mode 100644 index 0000000..2d557ff --- /dev/null +++ b/articles/20230605_pyspark_fu/tmp_out.json/part-00000-39f4fd2b-6eae-4445-9d8e-03f27cf106e9-c000.json @@ -0,0 +1,3 @@ +{"a":"b","custom":{"a":"true","z":"[\"1\",7,true,[1,2]]"}} +{"a":"c","custom":{"a":"b","z":"x"}} +{"a":"d","custom":{"a":"3","z":"[true]"}} From acfc5e9524874f8cc3399bb41882a63586d861e2 Mon Sep 17 00:00:00 2001 From: Thomas McKeesick Date: Wed, 3 Apr 2024 22:43:54 +1100 Subject: [PATCH 04/10] add tiny article about flatpak hdd access --- README.md | 5 +++++ ...allow_flatpak_to_use_secondary_hard_drives.md | 16 ++++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md diff --git a/README.md b/README.md index d453a1f..68dafdb 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,10 @@ my blog --- +### [20240403 Allow flatpak to use secondary hard drives](articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md) + +> _Useful when sharing a steam installation_ + ### [20240129 CS CFG tricks #1](articles/20240129_cs_cfg_tricks_#1/20240129_cs_cfg_tricks_#1.md) > _[Counter-Strike] Execute multiple commands with a single keybind_ @@ -86,3 +90,4 @@ my blog + diff --git a/articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md b/articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md new file mode 100644 index 0000000..09815c7 --- /dev/null +++ b/articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md @@ -0,0 +1,16 @@ +# 20240403 Allow flatpak to use secondary hard drives + +I run Steam on linux using the Flatpak version, which I find has less driver/configuration issues than the DEB/RPM equivalent. + +Every time I install it on a new OS, I need to ensure that it can access my shared SSD with my steam library directory on it. + +> One note is that you'll need to ensure that your SSD partition is mounted in the correct place before running your flatpak program. +I'd recommend configuring this to happen at startup with FSTAB (see my [handy fstab article](./articles/20220703_fstab/20220703_fstab.md)) + +In this example, the directory is at `/mnt/external/`. My steam library is at `/mnt/external/SteamLibrary` + +```shell +flatpak override --user --filesystem=/mnt/external/ com.valvesoftware.Steam +``` + +Now, I can restart steam and add this library location! From dc0896556dda891c2fdbc7904f425cddabebde7a Mon Sep 17 00:00:00 2001 From: Thomas McKeesick Date: Wed, 3 Apr 2024 22:46:30 +1100 Subject: [PATCH 05/10] use relative link --- .../20240403_allow_flatpak_to_use_secondary_hard_drives.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md b/articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md index 09815c7..8e51a6b 100644 --- a/articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md +++ b/articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md @@ -5,7 +5,7 @@ I run Steam on linux using the Flatpak version, which I find has less driver/con Every time I install it on a new OS, I need to ensure that it can access my shared SSD with my steam library directory on it. > One note is that you'll need to ensure that your SSD partition is mounted in the correct place before running your flatpak program. -I'd recommend configuring this to happen at startup with FSTAB (see my [handy fstab article](./articles/20220703_fstab/20220703_fstab.md)) +I'd recommend configuring this to happen at startup with FSTAB (see my [handy fstab article](../20220703_fstab/20220703_fstab.md)) In this example, the directory is at `/mnt/external/`. My steam library is at `/mnt/external/SteamLibrary` From 7fbf32878d0fae57bb489d8c2f1ec4fed08ce13a Mon Sep 17 00:00:00 2001 From: Thomas McKeesick Date: Thu, 4 Apr 2024 00:00:13 +1100 Subject: [PATCH 06/10] add bom update --- .../20220306_python_datetimes.md | 11 ++ .../20230919_parsing_boms_in_python.md | 170 +++++++++++++++--- 2 files changed, 161 insertions(+), 20 deletions(-) diff --git a/articles/20220306_python_datetimes/20220306_python_datetimes.md b/articles/20220306_python_datetimes/20220306_python_datetimes.md index 9146476..2ab7e22 100644 --- a/articles/20220306_python_datetimes/20220306_python_datetimes.md +++ b/articles/20220306_python_datetimes/20220306_python_datetimes.md @@ -1,5 +1,16 @@ # 20220306 Python Datetimes +- [Standard Library vs. Packages](#standard-library-vs-packages) +- [Current Datetime](#current-datetime) + - [Current Datetime as UTC](#current-datetime-as-utc) + - [But what if my timezone isn't UTC?](#but-what-if-my-timezone-isnt-utc) + - [Using the computer's local timezone](#using-the-computers-local-timezone) +- [Bonus](#bonus) + - [Removing microseconds from the datetime](#removing-microseconds-from-the-datetime) + + +## Standard Library vs. Packages + Python `datetime` frequently trip me up when I use them, usually due to a combination of 1. dense documentation that is hard to read diff --git a/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md b/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md index 85678f3..7dba121 100644 --- a/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md +++ b/articles/20230919_parsing_boms_in_python/20230919_parsing_boms_in_python.md @@ -1,34 +1,164 @@ # 20230919 Parsing BOMs in Python +- [Introduction](#introduction) +- [Show the BOM](#show-the-bom) +- [Create a UTF8 file](#create-a-utf8-file) + - [Reading the file in Python](#reading-the-file-in-python) +- [UTF-16](#utf-16) + - [The codecs package](#the-codecs-package) + - [BOM detection](#bom-detection) + - [Demo](#demo) + +## Introduction + +The "byte-order mark" or [BOM](https://en.wikipedia.org/wiki/Byte_order_mark) is a special char that appears at the very beginning of UTF8 and UTF16 files. +This marker is there to be our friend, however many languages and libraries don't generally deal with this marker by default, and python is no exception + +You'll usually encounter these files if you work with data that came from Windows programs, otherwise it's usually rare to see. + +## Show the BOM + +The UTF-8 BOM character is: `U+FEFF` + +In linux, it's possible to easily create a test file so that you can play around. + +I like to use `cat -A` to check for non-printing characters, you can pipe anything to cat by using the `-` character, e.g. + +```shell +☯ ~ echo -e '\xEF\xBB\xBF' | cat -A - +M-oM-;M-?$ + +☯ ~ printf '\ufeff\n' | cat -A - +M-oM-;M-?$ +``` + +## Create a UTF8 file + +To create a UTF8 file, use the BOM character from above and add some extra text, and save it to a file. + +```shell +☯ ~ printf '\ufeffhello world\n' > test.csv + +# check the file using the `file` command + ☯ ~ file test.csv +test.csv: Unicode text, UTF-8 (with BOM) text, with no line terminators + +# check the file using cat -A + ☯ ~ cat -A test.csv +M-oM-;M-?hello world +``` + +### Reading the file in Python + +When opening files, python will not remove the BOM character. + +```python +with open('test.csv') as istream: + s = istream.read() + +s +# '\ufeffhello world' +``` + +However, this can be easily fixed by using the `utf-8-sig` encoding! +The following info is buried within the [python codec documentation](https://docs.python.org/3/library/codecs.html): + +> On encoding the utf-8-sig codec will write 0xef, 0xbb, 0xbf as the first three bytes to the file. On decoding utf-8-sig will skip those three bytes if they appear as the first three bytes in the file. In UTF-8, the use of the BOM is discouraged and should generally be avoided. + + +```python +with open('test.csv', encoding='utf-8-sig') as istream: + s = istream.read() + +s +# 'hello world' +``` + +Now, you can see that the BOM character has been removed automatically! The same thing can be done with writing - automatically adding the BOM character by using the `utf-8-sig` encoding. + +```python +with open('test.csv', 'w', encoding='utf-8-sig') as ostream: + print('hello world', file=ostream) +``` + +## UTF-16 + +For UTF-16 files, the BOM character comes in 2 flavors, big-endian and little-endian. Python doesn't offer a handy encoding for these, so you'll have to do it manually. + +- UTF-16 BE: `U+FEFF` +- UTF-16 LE: `U+FFFE` + +To help out - let's write a file with a BOM16 character and some text. + +```python +with open('test16.csv', 'wb') as ostream: + ostream.write(codecs.BOM_UTF16) + ostream.write(b'hello world\n') +``` + +```shell +☯ ~ file test16.csv +test16.csv: Unicode text, UTF-16, little-endian text, with no line terminators + +☯ ~ cat -A test16.csv +M-^?M-~hello world$ +``` + +### The codecs package + +The standard library has a `codecs` package that contains a few handy constants for the BOM characters. + +```python +import codecs + +codecs.BOM_UTF16_LE +# b'\xff\xfe' +codecs.BOM_UTF16_BE +# b'\xfe\xff' +``` + +### BOM detection + +Using these constants, we can make a function that will detect a BOM character at the start of a file, and return the correct encoding. + ```python - import csv, codecs +import codecs - CODECS = { - "utf-8-sig": [codecs.BOM_UTF8], - "utf-16": [ +CODECS = { + "utf-8-sig": [codecs.BOM_UTF8], + "utf-16": [ codecs.BOM_UTF16, codecs.BOM_UTF16_BE, codecs.BOM_UTF16_LE, ] - } +} - def detect_encoding(fpath): - with open(fpath, 'rb') as istream: - data = istream.read(3) - for encoding, boms in CODECS.items(): - if any(data.startswith(bom) for bom in boms): - return encoding - return 'utf-8' +def detect_encoding(fpath: str) -> str: + # open the file in bytes mode + with open(fpath, 'rb') as istream: + # read the first 3 bytes (the UTF-8 BOM is 3 chars, the UTF-16 BOM is 2) + data = istream.read(3) + # iterate over the codecs and return the encoding if the BOM is found + for encoding, boms in CODECS.items(): + if any(data.startswith(bom) for bom in boms): + return encoding + return 'utf-8' - def read(fpath): - with open(fpath, 'r', encoding=detect_encoding(fpath)) as istream: - yield from csv.DictReader(istream) +detect_encoding('test.csv') +# 'utf-8-sig' +detect_encoding('test16.csv') +# 'utf-16' ``` +### Demo + +Finally, you could use this encoding detection inline when reading a file! For this test, I used a UTF16 file that I found in this repo: https://github.com/stain/encoding-test-files + ```python - # run here - for i, row in enumerate(read('test.csv')): - print(i, row) - if i > 10: - break + +with open(fpath, 'r', encoding=detect_encoding(fpath)) as istream: + s = istream.read() + +s +# 'première is first\npremière is slightly different\nКириллица is Cyrillic\n𐐀 am Deseret\n' ``` From 56446023b337805bab0835b3d1d5f0d1401a1920 Mon Sep 17 00:00:00 2001 From: Tom McKeesick Date: Thu, 11 Apr 2024 18:06:45 +1000 Subject: [PATCH 07/10] article boilerplate --- README.md | 5 +++++ .../20240411_join_and_coalesce_in_pyspark.md | 1 + 2 files changed, 6 insertions(+) create mode 100644 articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md diff --git a/README.md b/README.md index 68dafdb..2040a8e 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,10 @@ my blog --- +### [20240411 Join and Coalesce in Pyspark](articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md) + +> _Join two dataframes and merge their columns_ + ### [20240403 Allow flatpak to use secondary hard drives](articles/20240403_allow_flatpak_to_use_secondary_hard_drives/20240403_allow_flatpak_to_use_secondary_hard_drives.md) > _Useful when sharing a steam installation_ @@ -91,3 +95,4 @@ my blog + diff --git a/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md new file mode 100644 index 0000000..bc0d12d --- /dev/null +++ b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md @@ -0,0 +1 @@ +# 20240411 Join and Coalesce in Pyspark From e1b15d8d87090e152e21766233d2e18ac6f36622 Mon Sep 17 00:00:00 2001 From: Tom McKeesick Date: Thu, 11 Apr 2024 18:23:00 +1000 Subject: [PATCH 08/10] add article on pyspark join & coalesce --- .../20240411_join_and_coalesce_in_pyspark.md | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md index bc0d12d..5a1f22a 100644 --- a/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md +++ b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md @@ -1 +1,104 @@ # 20240411 Join and Coalesce in Pyspark + +## Scenario + +I had the need to join two dataframes in pyspark and coalesce in order to capture all possible values for each ID key. + +- both of the dataframes have the same key column +- records may exist in either dataframe, or both +- the dataframes have some shared columns, and some unique columns +- the shared columns may have null values in either dataframe + +## Example Dataframes + +***df1:*** + +| id|var1|var2|var4| +|---|----|----|----| +| 1|null| aa| yy| +| 2| a|null| yy| +| 3| b| 11| yy| +| 4| h| 22| yy| + +***df2:*** + +| id|var1|var2|var3| +|---|----|----|----| +| 1| f| Ba| xx| +| 2| a| bb| xx| +| 3| b|null| xx| + +## The Solution + +First, let's create some dataframes to work with. See my [pyspark-fu](../20230605_pyspark_fu/20230605_pyspark_fu.md) article for associated tips and tricks for local pyspark development. + + +- df1 has an extra column `var4` +- df2 has an extra column `var3` +- both dataframes + - have the same key column `id` + - and shared columns `var1` and `var2` + +```python +df1 = spark.createDataFrame([ + (1, None, 'aa', 'yy'), + (2 , 'a', None, 'yy'), + (3 , 'b', None, 'yy'), + (4 , 'h', None, 'yy'), + ], + 'id int, var1 string, var2 string, var4 string', +) + +df2 = spark.createDataFrame([ + (1, 'f', 'Ba', 'xx'), + (2 , 'a', 'bb', 'xx'), + (3 , 'b', None, 'xx'), + ], + 'id int, var1 string, var2 string, var3 string', +) +``` + +The solution itself is fairly straightforward + +- do a full outer join, so that all records are included +- just select the columns that are unique to each dataframe +- for the shared columns, use `F.coalesce` to select the non-null value + +> *Note: when coalescing, the order of the dataframes matters. Values from df1 will be used if they has a non-null value, which will ignore any valid values in df2* +> ***Ensure that your "preferred" dataframe is df1!*** + +```python +def join_coalesce(key_columns, df1, df2): + shared_cols = (set(df1.columns) & set(df2.columns)) - set(key_columns) + unique_cols = ( + (set(df2.columns)-set(df1.columns)) | (set(df1.columns)-set(df2.columns)) + ) - set(key_columns) + print(f'{shared_cols=}, {unique_cols=}. {set(df2.columns)=}') + + return ( + df1 + .join(df2, on=key_columns, how='full') + .select( + *[F.col(c) for c in key_columns], + *[F.coalesce(df1[i], df2[i]).alias(i) for i in shared_cols], + *[F.col(c) for c in unique_cols], + ) + ) + +result = join_coalesce(['id'], df1, df2) +result.show() +``` + +Behold! + +```python +shared_cols={'var1', 'var2'}, unique_cols={'var3', 'var4'}. set(df2.columns)={'var3', 'var1', 'id', 'var2'} ++---+----+----+----+----+ +| id|var1|var2|var3|var4| ++---+----+----+----+----+ +| 1| f| aa| xx| yy| +| 2| a| bb| xx| yy| +| 3| b|null| xx| yy| +| 4| h|null|null| yy| ++---+----+----+----+----+ +``` From 793cfe8bb2a45f77eafa31985ebbdde7a6711103 Mon Sep 17 00:00:00 2001 From: Tom McKeesick Date: Thu, 11 Apr 2024 18:24:57 +1000 Subject: [PATCH 09/10] add link --- .../20240411_join_and_coalesce_in_pyspark.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md index 5a1f22a..595b62f 100644 --- a/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md +++ b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md @@ -4,6 +4,8 @@ I had the need to join two dataframes in pyspark and coalesce in order to capture all possible values for each ID key. +> I noticed an answer on stack overflow https://stackoverflow.com/a/68534723/4431563, and was inspired to see if I could improve it slightly. + - both of the dataframes have the same key column - records may exist in either dataframe, or both - the dataframes have some shared columns, and some unique columns From 97e613d4c291484361e1c101eea83c14eea498aa Mon Sep 17 00:00:00 2001 From: Tom McKeesick Date: Thu, 11 Apr 2024 18:30:20 +1000 Subject: [PATCH 10/10] style tweak --- .../20240411_join_and_coalesce_in_pyspark.md | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md index 595b62f..9e91a6c 100644 --- a/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md +++ b/articles/20240411_join_and_coalesce_in_pyspark/20240411_join_and_coalesce_in_pyspark.md @@ -71,19 +71,17 @@ The solution itself is fairly straightforward ```python def join_coalesce(key_columns, df1, df2): - shared_cols = (set(df1.columns) & set(df2.columns)) - set(key_columns) - unique_cols = ( - (set(df2.columns)-set(df1.columns)) | (set(df1.columns)-set(df2.columns)) - ) - set(key_columns) - print(f'{shared_cols=}, {unique_cols=}. {set(df2.columns)=}') + shared_columns = (set(df1.columns) & set(df2.columns)) - set(key_columns) + unique_columns = (set(df1.columns) ^ set(df2.columns)) - set(key_columns) + print(f'{key_columns=}, {shared_columns=}, {unique_columns=}}') return ( df1 .join(df2, on=key_columns, how='full') .select( *[F.col(c) for c in key_columns], - *[F.coalesce(df1[i], df2[i]).alias(i) for i in shared_cols], - *[F.col(c) for c in unique_cols], + *[F.coalesce(df1[i], df2[i]).alias(i) for i in shared_columns], + *[F.col(c) for c in unique_columns], ) )