Encrypt a spark DataFrame
before writing to disk/file.
In certain scenarios, it may not be allowed to persist the dataframe to disk even temporarily. This may be prohibited due to some regulations.
Well, that is a simpler approach. However, there may be use cases where the data is shipped out and the recipient may be working with another technology and may want to just decrypt the entire file before loading it.
Currently, no option exists on DataFrameWriter
to encrypt the data frame being generated. However, there is an
option to compress the dataframe before writing it to a file on disk. This is provided by the option, codec
.
Spark allows using fully qualified class name of a class implementing, org.apache.hadoop.io.compress.CompressionCodec
.
We can use a class which implements the above interface and provide the encryption functionality. The main method that this interface demands is as follows,
CompressionOutputStream createOutputStream(OutputStream out)
CompressionOutputStream
extends standard java OutputStream
.
Based on this, the high level approach is as follows,
- Create a class which implements
CompressionCodec
- provide logic in method
createOutputStream
- Provide an encrypted stream as a class of
CompressionOutputStream
The command to execute the encryption is summarized as follows,
df.write.option("codec", "org.ameet.codec.EncryptionCodec").csv("<file path>")
- generate appropriate public/private keys using
gpg
and store them in separate files underresources
folder. - refer to these files in the
application.properties
. - change passwords in
application.properties
- build the jar using,
./gradlew clean build -x test
- Prepare the
log4j.properties
file as desired and note its path on the local or driver machine. - Invoke
spark-shell
as follows, It assumes that log4j.properties is available in cwd
spark-shell --conf "spark.executor.extraJavaOptions='-Dlog4j.configuration=log4j.properties'" \
--driver-java-options "-Dlog4j.configuration=file:///home/ubuntu/source/spark_encrypter/log4j.properties" \
--jars build/libs/encrypter-0.3.jar
- Read and transform the dataframe as needed,
val df = spark.read.csv("FL_insurance_sample.csv")
- Encrypt and save to desired directory,
df.write.option("codec","com.ameet.codec.EncryptionCodec").csv("ENC_FL_insurance")
The solution leverages encryption of OutputStream in java using CipherOutputStream
mechanism. In this approach
, once the appropriate cipher initialization is done, a new OutputStream is generated by passing the incoming stream
through encryption transformation.
The project utlizes a Strategy Pattern enabling plugging of any encryption mechanism into the mix. It demonstrates
2 approaches,
- Symmetric encryption using AES
- Asymmetric encryption using PGP
For PGP, the project makes use of BouncyCastle
provider and a higher-level wrapper on it, name.neuhalfen.projects.crypto.bouncycastle.openpgp
This wrapper makes it convenient and clean to expose an output stream from the encryption process, specifically for PGP.
In order to plug the encryption into compression, the code uses a dummy NoopCompressionOutputStream
class extending
from CompressionOutputStream
, which simply writes the data without any compression.
Windows spark command:
spark-shell --conf "spark.executor.extraJavaOptions='-Dlog4j.configuration=log4j.properties'" \
--driver-java-options "-Dlog4j.configuration=file:///C:/Users/ameet.chaubal/Documents/encrypter/log4j.properties" \
--jars .\build\libs\encrypter-0.3.jar
- Currently, only tested on
csv
- performance characteristics not tested