Concurrent Data Writing in Databricks with PySpark and Delta-rs Without Losing Data #2696
Unanswered
premtiwari14f
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Q2: How to handle concurrent writes to a Delta table in Databricks without losing data?
Q1: Why does delta-rs not create the .crc file?
Spark code
`import pytz
import random
import time
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import os
import pandas as pd
Function to generate a random date within the last 10 days
def random_date():
return datetime.now().date() - timedelta(days=random.randint(0, 10))
Function to write DataFrame to Delta Lake with retries
def write_with_retries(df, path, max_retries=10, initial_delay=0.5):
attempt = 0
while attempt < max_retries:
try:
df.write
.mode("append")
.partitionBy("date")
.save(path)
print("Data written to Delta Lake table successfully.")
return # Exit the function if write succeeds
except Exception as e:
if "version 0 already exists" in str(e):
attempt += 1
delay = initial_delay * (2 ** (attempt - 1)) # Exponential backoff
delay += random.uniform(0, 0.1) # Add jitter to avoid collision
print(f"Attempt {attempt} failed with error: {e}. Retrying in {delay:.2f} seconds.")
time.sleep(delay)
else:
raise # Raise other exceptions immediately
def fixed_date():
return datetime.now().date() - timedelta(days=1)
Main loop to generate and write data
delta_table_path = "/mnt/path" # Replace with your S3 Delta Lake table path
for i in range(20):
num_rows = 50000
data = {
'id': range(1, num_rows + 1),
'name': [random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve']) for _ in range(num_rows)],
'age': [random.choice([25, 30, 35, 40, 45]) for _ in range(num_rows)],
'date': [fixed_date() for _ in range(num_rows)]
}
`
delta-rs code
`import os
import pandas as pd
from deltalake import DeltaTable, write_deltalake
from datetime import datetime, timedelta
storage_options = {
'AWS_S3_LOCKING_PROVIDER': 'dynamodb',
'DELTA_DYNAMO_TABLE_NAME': 'delta_log2'
}
s3_bucket = 'aaaaaaaa'
s3_delta_table_path = f's3://{s3_bucket}/mnt/path'
os.environ['AWS_ACCESS_KEY_ID'] = 'aaaaaaaaaaaaa'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
os.environ['AWS_REGION'] = 'eu-west-1'
os.environ['AWS_S3_ALLOW_UNSAFE_RENAME'] = 'true'
import pytz
import random
import time
import delta.tables as delta
Function to generate a random date within the last 10 days
def random_date():
return datetime.now().date() - timedelta(days=random.randint(0, 10))
Function to write DataFrame to Delta Lake with retries
def write_with_retries(df, max_retries=10, initial_delay=0.5):
attempt = 0
def fixed_date():
return datetime.now().date() - timedelta(days=1)
Main loop to generate and write data
for i in range(20):
num_rows = 50000
data = {
'id': range(1, num_rows + 1),
'name': [random.choice(['Alice', 'Bob', 'Charlie', 'David', 'Eve']) for _ in range(num_rows)],
'age': [random.choice([25, 30, 35, 40, 45]) for _ in range(num_rows)],
'date': [fixed_date() for _ in range(num_rows)]
}
`
Beta Was this translation helpful? Give feedback.
All reactions