Table of Contents generated with DocToc
Forked from https://github.com/rymurr/flight-spark-source Tailored for the chainsformer flight service use case To run in spark, it supports two modes out of three supported by spark data source v2
- Batch
- Micro Batch Streaming
- Continuous Streaming
Install Java 8:
- For x64 processors, follow instructions here: https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html
- For arm64 processors, use Azul JDK as an alternative to the official package:
brew tap homebrew/cask-versions brew install --cask zulu8 echo "export JAVA_HOME=`/usr/libexec/java_home -v 1.8`" >> ~/.zshrc source ~/.zshrc java -version
Install Maven:
brew install maven
Rebuild everything:
make build
Binary location
./target/chainsformer-spark-source-1.0-SNAPSHOT-shaded.jar
- Upload the binary to the DBFS on databricks.
- Install the binary on the spark cluster.
- Write spark jobs following examples below.
schema_name = "YOUR_SCHEMA"
in_table = "transactions"
out_table = f"chainsformer_{blockchain}_{network}_{in_table}"
checkpoint_location = f"FileStore/users/{schema_name}/checkpoints/chainsformer/{out_table}"
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 100)
df = (spark.read.format("coinbase.chainsformer.apache.arrow.flight.spark")
.option("host", "ethereum-mainnet-prod.coinbase.com")
.option("port", "8000")
.option("blocks_start_offset", 14000000)
.option("blocks_end_offset", 15000000)
.option("blocks_per_partition", 100)
.option("blocks_max_batch_size", 20000)
.option("partition_by_size", 100000)
.load(in_table))
df.write.option("checkpointLocation", checkpoint_location).saveAsTable(f"{schema_name}.{out_table}")
schema_name = "YOUR_SCHEMA"
in_table = "streamed_transactions"
out_table = f"chainsformer_{blockchain}_{network}_{in_table}"
checkpoint_location = f"FileStore/users/{schema_name}/checkpoints/chainsformer/{out_table}"
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", 100)
df = (spark.readStream.format(""coinbase.chainsformer.apache.arrow.flight.spark")
.option("host", "ethereum-mainnet-prod.coinbase.com")
.option("port", "8000")
.option("events_start_offset", 4000000)
.option("events_per_partition", 100)
.option("events_max_batch_size", 20000)
.option("partition_by_size", 100000)
.load(in_table))
df.writeStream.option("checkpointLocation", checkpoint_location).toTable(f"{schema_name}.{out_table}")
- blocks_start_offset: the start block height (inclusive). batch and micro batch
- blocks_end_offset: the end block height (exclusive). batch
- blocks_per_record: the number of blocks sent in one http2 stream package/ record batch and micro batch
- blocks_per_partition: the number of blocks included in each partition, which maps to delta table file If not enough blocks, use whatever available. batch and micro batch
- blocks_max_batch_size: limit each batch's max blocks. micro batch
- partition_by_size: the size of partition_by_col to partition on. micro batch
- partition_by_col: the col used to partition by. work with partition_by_size micro batch
- compression: compress algo. LZ4_FRAME, ZSTD, or don't set to opt out as defaultbatch and micro batch
To test code changes:
- Make code changes in the chainsformer-spark-source repo.
- Build the Jar with
make build
. - Upload
chainsformer-spark-source-1.0-SNAPSHOT-shaded.jar
to the DBFS. - Install the uploaded Jar to the spark cluster.
- Start your cluster.
- Attach notebook to the cluster and run the streaming job.