Gary Sieling

Importing Data from Solr to Postgres with Scala

I suspect most people who set up Solr indexes pull data from a second system into Solr; having written a previous example where I pulled git data into a Solr index, I copied this data into Postgres to allow comparing the behavior of two full-text indexers.

This is a fairly simple process if you make a Postgres table that has columns named the same as Postgres – in my case I set them all to “character varying”, in the interest of avoiding type conversion issues. If the dataset isn’t too large, you can fix the types later this way (mine is about ~1.6 million rows). This streams the results from Solr, so that you don’t have to worry about memory (while this runs, it does creep up slowly, perhaps some work building up for the garbage collector)

The code that results from this is pretty simple – very similar to the Java equivalent, but more compact and easier to read. One of the awesome things about Scala is that merely importing the JavaConversions namespace fixed a lot of autoboxing issues between Java and Scala, with the exception of the “Float” in streamDocListInfo.

import java.io._
import java.sql._

import scala.collection.JavaConversions._
import collection.immutable._

import org.apache.solr.client.solrj._
import org.apache.solr.client.solrj.impl._
import org.apache.solr.common._
import org.apache.solr.common.params._

object SolrToPostgres extends App {
  val server = new HttpSolrServer(
      "http://localhost:8080/solr/collection1")

  val params = HashMap(  
    "collectionName" -> "collection1",
    "q" -> "*:*",
    "start" -> "0",
    "rows" -> Int.MaxValue.toString)
    
  val solrParams = new MapSolrParams(params)
  
  val connection = DriverManager.getConnection(
      "jdbc:postgresql://127.0.0.1:5432/github", "postgres", "")

  val query = "DELETE FROM data"
  val delete = connection.createStatement
  delete.execute(query)
  
  val callback = new StreamingResponseCallback() {
    def streamDocListInfo(arg0: Long, 
                          arg1: Long, 
                          arg2: java.lang.Float) = {
      
    }

    def streamSolrDocument(doc: SolrDocument) = {
      val query =
        "INSERT INTO " +
        "data   (author, id, email, company, date, message) " +
        "VALUES (?,      ?,  ?,     ?,       ?,    ?,     )"
      
      try {
        val s: PreparedStatement = connection.prepareStatement(query)
        s.setString(1, doc.getFieldValue("author").toString)
        s.setString(2, doc.getFieldValue("id").toString)
        s.setString(3, doc.getFieldValue("email").toString)
        s.setString(4, doc.getFieldValue("company").toString)
        s.setString(5, doc.getFieldValue("date").toString)
        s.setString(6, doc.getFieldValue("message").toString)
        s.setString(7, doc.getFieldValue("name").toString)
        s.setString(8, doc.getFieldValue("github").toString)
        s.setString(9, doc.getFieldValue("search").toString)
        
        s.execute
        
        s.close
      }
      catch
      {
        case e: Exception =>
        println(e.getMessage)
        println(e.getStackTrace)
      }
    } 
  }
  
  server.queryAndStreamResponse(solrParams, callback)
  connection.close
}
Exit mobile version