Project: FHIR Analytics Using Apache Spark and Cassandra

toolkit
gsoc2018-project
fhir
gsoc2018

(Prashadi) #1

Hi All,

I’m starting this post to discuss about the FHIR analytics using Apache Spark and Cassadra.

I would like to update with current progress. I have stored some FHIR resource on Cassandra and read them using Datastax Spark Cassandra component. Then I have convert these data to dataset which allow user to run Spark SQL on top of it.

Following is my sample main class.

package org.librehealth.fhir.analytics.cassandra;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import com.cerner.bunsen.FhirEncoders;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.hl7.fhir.dstu3.model.Patient;

import java.io.Serializable;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

public class App2 implements Serializable {

    private static String patient = "Sample FHIR Patient Resource";
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Spark Canary (CC) - Test")
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host", "127.0.0.1");
        JavaSparkContext sc = new JavaSparkContext(conf);
        CassandraConnector connector = CassandraConnector.apply(conf);
        SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS simple_canary_cc");
            session.execute("CREATE KEYSPACE simple_canary_cc WITH " +
                    "REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}");
            session.execute("CREATE TABLE simple_canary_cc.simple_pair " +
                    "(key int PRIMARY KEY, value text)");
        }

        List<SimplePair> pairs = Arrays.asList(
                new SimplePair(1, patient)
        );


        JavaRDD<SimplePair> simplekvRDD = sc.parallelize(pairs);
        javaFunctions(simplekvRDD)
                .writerBuilder("simple_canary_cc", "simple_pair",
                        mapToRow(SimplePair.class))
                .saveToCassandra();
        JavaRDD<Patient> cassandraRDD = javaFunctions(sc).cassandraTable("simple_canary_cc", "simple_pair")
                .map(new Function<CassandraRow, Patient>() {
                    @Override
                    public Patient call(CassandraRow cassandraRow) throws Exception {
                        FhirContext ourFhirCtx = FhirContext.forDstu3();
                        IParser parser=ourFhirCtx.newJsonParser().setPrettyPrint(true);
                        String patientSrt = cassandraRow.getString("value");
                        Patient patientOb=parser.parseResource(Patient.class, patientSrt);
                        return patientOb;
                    }
                });

        FhirEncoders encoders =
                FhirEncoders.forStu3().getOrCreate();
        System.out.println("Creating data set from cassandra data");
        Dataset<Patient> peopleDFq = spark.createDataset(cassandraRDD.rdd(), encoders.of(Patient.class));
        System.out.println("Querying data using spark SQL");
        peopleDFq.createOrReplaceTempView("patient");
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM patient where gender='male'");
        sqlDF.show();
    }

    public static class SimplePair implements Serializable {
        private Integer key;
        private String value;

        public SimplePair() { /* Java Bean ctor */ }

        public SimplePair(Integer key, String value) {
            this.key = key;
            this.value = value;
        }

        public Integer getKey() { return key; }
        public void setKey(Integer key) { this.key = key; }

        public String getValue() { return value; }
        public void setValue(String value) { this.value = value; }

        @Override
        public String toString() {
            return MessageFormat.format("(key: {0}, val: {1})", key, value);
        }
    }
}

I’ll push my changes to the simple spring boot project that I’m working on. My approach is to extend and make this functionality generic. I have used Bunsen encoders to covert data to dataset.

@sunbiz @namratanehete @judywawira what do you think?


(Prashadi) #2

My project works currently in https://gitlab.com/kavindya89/librehealth-fhir-analytics/.

Also please let us know your thoughts on my previous post.


(Saptarshi Purkayastha) #3

Are you creating this to run SQL queries over Spark? I dont quite see how you are using Bunsen here. I would also think that writing SQL queries should be one of the options, but something like Cohort queries and doing drilldown analysis of that subset of patients will be the main objective.


(Prashadi) #4

Thank you for the response. Yes I’m coverting FHIR resource to spark compatible structure using Bunsen encoders which allows us to write on Spark SQL on the data sets. I didn’t know about the cohort. I’ll check that too.

        Dataset<Patient> peopleDFq = spark.createDataset(cassandraRDD.rdd(), encoders.of(Patient.class));

My plan as follow.

  • Implement REST API using spring boot which wraps the operations that are going to expose (This wraps the core analytic API of the spring application)

  • Then build a quick UI to try out the capabilities. It allows to test and iteratively improve the application capabilities.

Since @yashdsaraf working on storing the FHIR resources, for the moment I’m storing the FHIR resource content in to single column. I’ll move to fetching data from his modal, once it’s finalized.


(Prashadi) #5

@sunbiz @namratanehete @judywawira I have few questions after I’m looking at cohorts.

If we use Spark, it will require to load data from Cassandra when it required to run the queries.

According to the cohorts, it seems at the end it will generate a SQL which runs against the database from the cohort definition.

So we can write a query builder and also give native Spark SQL support. I’ll further evaluate on query building appproach against Spark SQL.


(Prashadi) #6

@sunbiz @namratanehete @judywawira I have tried advanced SQL in https://gitlab.com/kavindya89/librehealth-fhir-analytics/blob/master/src/main/java/org/librehealth/fhir/analytics/cassandra/App.java.

Also further more during past week, I took https://github.com/google/fhir.git all the STU3 data and implement data loading to cassandra as I need to have a solid set of data. You can find implementation in https://gitlab.com/kavindya89/librehealth-fhir-analytics/blob/master/src/main/java/org/librehealth/fhir/analytics/cassandra/CassandraDataStoreServiceImpl.java

I’ll be writing blogpost on my current work.

As complex query support can be achieve via spark SQL, I’m looking at chort module and see how dynamic query generated can be provides.

You can run App.java class after executing mvn clean install. Only pre request is start apache-cassandra-2.2.12. Data will be automatically populated.


(Prashadi) #7

@sunbiz @namratanehete @judywawira

During the past week, I redefined the project structure and created sample spring boot application which allow users to run Spark Queries over the FHIR data. For the moment, I’m loading google sample FHIR data to the Apache Spark(Patient/Observation)

Please feel free to try this and let me know your feedback. Please follow the readme and admin/admin as the credentials to login.

I have wrote a blogpost in https://medium.com/@prkpbandara/gsoc-librehealth-apache-spark-with-cassandra-for-fhir-analytics-9ae95f684032

My code is at https://gitlab.com/kavindya89/librehealth-fhir-analytics/tree/master

Currently I’m working in providing cohort builder like functionality.

@sunbiz @namratanehete @judywawira

According to the @yashdsaraf last post, FHIR resources are stored in column wise form. By going forward, I think I need to integrate this component to @yashdsaraf repository. If we store data in column wise format, we can use CQL based approach for query builder? But I don’t think it will be scale.


(Saptarshi Purkayastha) #8

Why do you think it will not scale?


(Prashadi) #9

@sunbiz sorry for the delay as I had to handover my laptop to upgrade the RAM as always my machine is getting stucked when running cassandra and spark together with the IDE.

The reason I believe it will not scale is that when database grows, cassandra stores data across the cluster node. So if we going to execute cluster wide joins over the FHIR data, it will add very high load in the cassandra nodes. Using spark will reduce the load as it’s execute the queries across cluster in distributed manner. What do you think?


(Namratanehete) #10

@prashadi You have a good point. But that means spark will be a required dependency. For the analytics engine spark might be a requirement but for the whole platform I suggest that we do not want a spark dependency. Cassandra alone should work.


(Prashadi) #11

Thank you for the reply @namratanehete. We can make analytics to work only with Cassandra. It will be solely based on CQL. Also with Bunsen FHIR structures exposed as a unified format which allow users to perform advanced queries in Spark compared to using cassandra CQL. The capabilities of CQL will be depend on @yashdsaraf structure of storing data in the cassandra. One more thing we should consider is that, single FHIR resource it self contains complex elements within the resource. So we will need to stored data in appropriate columns which should be able to use via CQL. Because the query builder will ultimately build a CQL or Spark SQL depend on the approach we are going to take. I will do more research on this and come up with small comparison.


(Yash D. Saraf) #12

Currently I’m using converters to store complex (non primitive) data types. This means a lot of them are stored in JSON format. To store the data in separate columns, these complex data types will need to be declared as User Defined Types in Cassandra.
Spring Data Cassandra provides a way to do this by simply annotating the complex data type classes with @UserDefinedType which again poses the same problem as before i.e We can’t annotate the HAPI FHIR classes without extending (in this case extending won’t work either) or duplicating them.


(Prashadi) #13

Hi all,

@sunbiz @namratanehete @judywawira

I have done some comprehensive research on using CQL and Spark SQL.

Cassandra is a write optimized database. Cassandra user guides suggest people to duplicate data if it optimize the read operations. CQL or Cassandra APIs does not provide support for JOIN operations across multiple tables. Which means running complex queries across multiple tables can’t be supported. This is purely based on the NoSQL concepts. As I shown earlier, spark SQL allow users to write complex SQLs to do more drill down analysis. If we use CQL, we only have ability to run CQL on single resources only.

Complex query example which selects patients based on valueQuantity on Spark SQL.

SELECT patient.id, observation.id, observation.subject, observation.valueQuantity FROM patient inner join observation where observation.subject.reference == patient.id and observation.valueQuantity.value > 15

Another limitation of Cassandra is that, WHERE clause only can be used with indexed columns. As an example, if user want to search patients by firstname, then firstname should column should be indexed. Likewise, if we needs to provide more query options based on columns, we will need to create secondary indexes. Cassandra doesn’t encourage people to create large number of indexes as it will affect the performance of write queries.

Cassandra CQL provides option to filter non indexed columns[2] via ALLOW FILTERING option. But usage of this isn’t highly encourage.

According to the online resources[3], it suggest to use Spark as a viable option to execute complex queries.

@sunbiz @yashdsaraf I think we will need to go through our data model of storing patient carefully. According to my understanding with FHIR, we will need to support basic search operations based on resource attributes. If our data model doesn’t fit in, it will create performance issues when data grows up. I’m going to write a blogpost about my findings.

According to the aforesaid limitations, I think Spark is the viable option. Are we going to have a modular approach in LibreHealth? If so we might fit FHIR Analytics as a separate module and use it appropriately.

References

[1] https://docs.datastax.com/en/cql/3.1/cql/cql_reference/select_r.html

[2] https://www.datastax.com/dev/blog/allow-filtering-explained-2

[3] https://www.datastax.com/2015/03/how-to-do-joins-in-apache-cassandra-and-datastax-enterprise


(Prashadi) #14

@sunbiz @yashdsaraf is it good to store the entire resource as JSON in a column of each resource?


(Prashadi) #15

@sunbiz @namratanehete @judywawira I completed the blogpost with findings in https://medium.com/@prkpbandara/gsoc-librehealth-working-with-cassandra-for-fhir-analytics-9e66eecec6a7.

Let me know your thoughts and suggestions.


(Prashadi) #16

@sunbiz @namratanehete @judywawira since we have several limitation with CQL, I’ll be focusing on Spark Based Query Builder approach. If you have any suggestions, please do let me know. My plan is to get the initial version done and merge with @yashdsaraf repository.


(Namratanehete) #17

@prashadi I asked you yesterday in LibreHealth chat. Are you missing any dependency for jackson-databind? I am getting “java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException” while trying to execute your code. I can see you have specified jackson.version in pom.xml but not used anywhere.


(Prashadi) #18

@namratanehete Let me quickly check on this and get back to you soon. Sorry I missed the notifications.


(Prashadi) #19

@namratanehete I have converted project to war. After several hours of solving dependency issues, now it’s getting successfully deploy as a web application.

Please follow https://gitlab.com/kavindya89/librehealth-fhir-analytics/blob/master/README.md to deploy the project and try out the functionality.


(Prashadi) #20

Hi All,

To give a update, I’m working on patient attribute bases search view with query builder for Apache Spark. I’ll push changes as soon as possible when initial version is done.

I came across article which discuss about FHIR analytics (https://www.linkedin.com/pulse/analytics-fhir-chris-grenz). In here, author is using Apache Drill. Apache Drill is a SQL Query Engine for Big Data. Apache Drill allow users to load FHIR data in files from MongoDB, HBase and S3. Not very good fit for us. But interesting. In converts FHIR resource to a SQL table base format like we do with the Apache Spark using Bunsen.