Processing the blockchain with Apache Spark

Emerging cryptocurrencies such as Bitcoin provide a unique opportunity to gain insight into uses and evolution of a currency over a period of time, which is now almost 6 years. This is important for understanding digital currencies systems and could also inform development of novel empirical economic models.

However, the blockchain has passed the 400K boundary and each node now maintains an internal database of roughly 70 GB. Exporting data for analytics purposes will blow up data volume even further, which poses a challenge for analytics task.

So we have to think about alternative, horizontally scaling data infrastructures such as Apache Spark. Just like Apache Hadoop it is a distributed computing engine, with the main difference being that data remains stored in-memory when running iterative computing tasks such as joining and querying structured data.

In the following example I would like to show how to use SparkR, which is a light-weight front-end to Spark, for plotting the number of Bitcoin transactions per day, similar to this chart on blockchain.info.

We used the Bitcoin JSON-RPC API to export blockchain data into two distinct CSVs: a small one covering transactions up to block 180K (~ 630MB) and a larger one up to block 380K (~ 17GB). It can be downloaded as follows:

curl -O https://storage.googleapis.com/sparkr_tutorial/sparkR-tutorial-dataset.tgz
tar xvfz sparkR-tutorial-dataset.tgz

The R source code shown below as well as a pointer to the dataset is available in this Github repo along with instructions for setting up Spark locally.

We start off by setting up pointers to our downloaded dataset files. Note: we point to the small dataset since we are still working locally, not on a cluster.

DUMP_DIR <- c("/user/username/sparkR-tutorial-dataset/small")

BLOCKS_FILE <- paste(DUMP_DIR, "/blocks.csv", sep="")
TX_FILE <- paste(DUMP_DIR, "/transactions.csv", sep="")
REL_BLOCKS_TX_FILE <- paste(DUMP_DIR, "/rel_block_tx.csv", sep="")

The set up a pointer to our local Spark installation, load the Spark R library and initialize a SparkContext and, since we are working with Spark’s DataFrame API, also an SQLContext. We are using the Databricks Spark-CSV extension for reading CSV.

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
    Sys.setenv(SPARK_HOME = "/user/username/spark-1.6.1-bin-hadoop2.6")
}

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

sc <- sparkR.init(master = "local[*]",
                  appName = "Bitcoin SparkR Demo",
                  sparkEnvir = list(spark.driver.memory="4g"),
                  sparkPackages="com.databricks:spark-csv_2.11:1.3.0")

sqlContext <- sparkRSQL.init(sc)

Next we define SparkR Dataframes for our three CSVs and transform Unix timestamps to date since this is what we want to have on the X-axis of our plot.

cat("Loading blocks dataset\n")
blocksSchema <- structType(
    structField("block_hash", "string"),
    structField("height", "integer"),
    structField("timestamp", "integer")
)
blocks <- read.df(sqlContext, BLOCKS_FILE,
                  source="com.databricks.spark.csv", schema = blocksSchema)

cat("Adding date column computed from timestamp column\n")
blocks$date <- from_unixtime(blocks$timestamp, "yyyy-MM-dd")

cat("Loading transaction dataset\n")
txSchema <- structType(
  structField("tx_hash", "string"),
  structField("is_coinbase", "boolean")
)
txs <- read.df(sqlContext, TX_FILE,
               source="com.databricks.spark.csv", schema = txSchema)

cat("Loading block -> transaction relationships\n")
relSchema <- structType(
  structField("block_hash", "string"),
  structField("tx_hash", "string")
)
rel_blocks_tx <- read.df(sqlContext, REL_BLOCKS_TX_FILE,
                         source="com.databricks.spark.csv", schema = relSchema)

Then we define joins for our tables for our tables, count the number of transactions per day and summarize results in a dataframe. Note: so far all computational steps happened on Spark, which means that they could also run on a cluster.

cat("Joining block hash into transaction dataframe\n")
txs_w_block_hash <- join(txs, rel_blocks_tx,
                         txs$tx_hash == rel_blocks_tx$tx_hash, "left")

cat("Joining block date into transactions dataframe\n")
txs_w_date <- join(txs_w_block_hash, blocks,
                   txs_w_block_hash$block_hash == blocks$block_hash, "left")

cat("Grouping transactions by date\n")
tx_frequency <- summarize(groupBy(txs_w_date, txs_w_date$date),
                          count = n(txs_w_date$date))

Now we have our data almost in an almost final shape, but still floating around in Spark and not our local R environment. So the next step is to collect the Spark Dataframe and transform it into a local R Dataframe.

tx_frequency <- collect(arrange(tx_frequency, asc(tx_frequency$date)))
tx_frequency$date <- as.Date(tx_frequency$date)

Finally, we use ggplot2 in our local R environment to plot our figure

library("ggplot2")
library("scales")

cat("Plotting transaction frequency\n")
g <- ggplot(tx_frequency, aes(date, count)) +
            geom_line(colour = "grey") +
            scale_x_date() +
            stat_smooth() +
            ylab("Number of transactions") +
            xlab("") +
            scale_y_continuous(labels=comma)
g

While it makes sense to run a local Spark instance for prototyping analytics procedures on small datasets, such a setup would not scale for our larger dataset. However, if you submit above code to a Spark Cluster with some worker nodes it does scale…also for larger datasets. We tested it on a cluster with 4 workers, each having 48GB RAM and processing the large dataset took approx. 2min.