Hi All,
I’m starting this post to discuss about the FHIR analytics using Apache Spark and Cassadra.
I would like to update with current progress. I have stored some FHIR resource on Cassandra and read them using Datastax Spark Cassandra component. Then I have convert these data to dataset which allow user to run Spark SQL on top of it.
Following is my sample main class.
package org.librehealth.fhir.analytics.cassandra;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import com.cerner.bunsen.FhirEncoders;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.hl7.fhir.dstu3.model.Patient;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
public class App2 implements Serializable {
private static String patient = "Sample FHIR Patient Resource";
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Spark Canary (CC) - Test")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "127.0.0.1");
JavaSparkContext sc = new JavaSparkContext(conf);
CassandraConnector connector = CassandraConnector.apply(conf);
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS simple_canary_cc");
session.execute("CREATE KEYSPACE simple_canary_cc WITH " +
"REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE simple_canary_cc.simple_pair " +
"(key int PRIMARY KEY, value text)");
}
List<SimplePair> pairs = Arrays.asList(
new SimplePair(1, patient)
);
JavaRDD<SimplePair> simplekvRDD = sc.parallelize(pairs);
javaFunctions(simplekvRDD)
.writerBuilder("simple_canary_cc", "simple_pair",
mapToRow(SimplePair.class))
.saveToCassandra();
JavaRDD<Patient> cassandraRDD = javaFunctions(sc).cassandraTable("simple_canary_cc", "simple_pair")
.map(new Function<CassandraRow, Patient>() {
@Override
public Patient call(CassandraRow cassandraRow) throws Exception {
FhirContext ourFhirCtx = FhirContext.forDstu3();
IParser parser=ourFhirCtx.newJsonParser().setPrettyPrint(true);
String patientSrt = cassandraRow.getString("value");
Patient patientOb=parser.parseResource(Patient.class, patientSrt);
return patientOb;
}
});
FhirEncoders encoders =
FhirEncoders.forStu3().getOrCreate();
System.out.println("Creating data set from cassandra data");
Dataset<Patient> peopleDFq = spark.createDataset(cassandraRDD.rdd(), encoders.of(Patient.class));
System.out.println("Querying data using spark SQL");
peopleDFq.createOrReplaceTempView("patient");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM patient where gender='male'");
sqlDF.show();
}
public static class SimplePair implements Serializable {
private Integer key;
private String value;
public SimplePair() { /* Java Bean ctor */ }
public SimplePair(Integer key, String value) {
this.key = key;
this.value = value;
}
public Integer getKey() { return key; }
public void setKey(Integer key) { this.key = key; }
public String getValue() { return value; }
public void setValue(String value) { this.value = value; }
@Override
public String toString() {
return MessageFormat.format("(key: {0}, val: {1})", key, value);
}
}
}
I’ll push my changes to the simple spring boot project that I’m working on. My approach is to extend and make this functionality generic. I have used Bunsen encoders to covert data to dataset.
@sunbiz @namratanehete @judywawira what do you think?