Project: FHIR Analytics Using Apache Spark and Cassandra

gsoc2018-project
gsoc2018
toolkit
fhir

(Prashadi) #1

Hi All,

I’m starting this post to discuss about the FHIR analytics using Apache Spark and Cassadra.

I would like to update with current progress. I have stored some FHIR resource on Cassandra and read them using Datastax Spark Cassandra component. Then I have convert these data to dataset which allow user to run Spark SQL on top of it.

Following is my sample main class.

package org.librehealth.fhir.analytics.cassandra;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import com.cerner.bunsen.FhirEncoders;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.hl7.fhir.dstu3.model.Patient;

import java.io.Serializable;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

public class App2 implements Serializable {

    private static String patient = "Sample FHIR Patient Resource";
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Spark Canary (CC) - Test")
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host", "127.0.0.1");
        JavaSparkContext sc = new JavaSparkContext(conf);
        CassandraConnector connector = CassandraConnector.apply(conf);
        SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS simple_canary_cc");
            session.execute("CREATE KEYSPACE simple_canary_cc WITH " +
                    "REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
            session.execute("CREATE TABLE simple_canary_cc.simple_pair " +
                    "(key int PRIMARY KEY, value text)");
        }

        List<SimplePair> pairs = Arrays.asList(
                new SimplePair(1, patient)
        );


        JavaRDD<SimplePair> simplekvRDD = sc.parallelize(pairs);
        javaFunctions(simplekvRDD)
                .writerBuilder("simple_canary_cc", "simple_pair",
                        mapToRow(SimplePair.class))
                .saveToCassandra();
        JavaRDD<Patient> cassandraRDD = javaFunctions(sc).cassandraTable("simple_canary_cc", "simple_pair")
                .map(new Function<CassandraRow, Patient>() {
                    @Override
                    public Patient call(CassandraRow cassandraRow) throws Exception {
                        FhirContext ourFhirCtx = FhirContext.forDstu3();
                        IParser parser=ourFhirCtx.newJsonParser().setPrettyPrint(true);
                        String patientSrt = cassandraRow.getString("value");
                        Patient patientOb=parser.parseResource(Patient.class, patientSrt);
                        return patientOb;
                    }
                });

        FhirEncoders encoders =
                FhirEncoders.forStu3().getOrCreate();
        System.out.println("Creating data set from cassandra data");
        Dataset<Patient> peopleDFq = spark.createDataset(cassandraRDD.rdd(), encoders.of(Patient.class));
        System.out.println("Querying data using spark SQL");
        peopleDFq.createOrReplaceTempView("patient");
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM patient where gender='male'");
        sqlDF.show();
    }

    public static class SimplePair implements Serializable {
        private Integer key;
        private String value;

        public SimplePair() { /* Java Bean ctor */ }

        public SimplePair(Integer key, String value) {
            this.key = key;
            this.value = value;
        }

        public Integer getKey() { return key; }
        public void setKey(Integer key) { this.key = key; }

        public String getValue() { return value; }
        public void setValue(String value) { this.value = value; }

        @Override
        public String toString() {
            return MessageFormat.format("(key: {0}, val: {1})", key, value);
        }
    }
}

I’ll push my changes to the simple spring boot project that I’m working on. My approach is to extend and make this functionality generic. I have used Bunsen encoders to covert data to dataset.

@sunbiz @namratanehete @judywawira what do you think?


(Prashadi) #2

My project works currently in https://gitlab.com/kavindya89/librehealth-fhir-analytics/.

Also please let us know your thoughts on my previous post.


(Saptarshi Purkayastha) #3

Are you creating this to run SQL queries over Spark? I dont quite see how you are using Bunsen here. I would also think that writing SQL queries should be one of the options, but something like Cohort queries and doing drilldown analysis of that subset of patients will be the main objective.


(Prashadi) #4

Thank you for the response. Yes I’m coverting FHIR resource to spark compatible structure using Bunsen encoders which allows us to write on Spark SQL on the data sets. I didn’t know about the cohort. I’ll check that too.

        Dataset<Patient> peopleDFq = spark.createDataset(cassandraRDD.rdd(), encoders.of(Patient.class));

My plan as follow.

  • Implement REST API using spring boot which wraps the operations that are going to expose (This wraps the core analytic API of the spring application)

  • Then build a quick UI to try out the capabilities. It allows to test and iteratively improve the application capabilities.

Since @yashdsaraf working on storing the FHIR resources, for the moment I’m storing the FHIR resource content in to single column. I’ll move to fetching data from his modal, once it’s finalized.


(Prashadi) #5

@sunbiz @namratanehete @judywawira I have few questions after I’m looking at cohorts.

If we use Spark, it will require to load data from Cassandra when it required to run the queries.

According to the cohorts, it seems at the end it will generate a SQL which runs against the database from the cohort definition.

So we can write a query builder and also give native Spark SQL support. I’ll further evaluate on query building appproach against Spark SQL.