Use Cats Effects, Doobie to run and connect to a Postgres Docker image

The following program will run a Docker image of Postgres, given a password. It will wait 5 seconds, and then connect using Doobie JDBC:

implicit val timer = IO.timer(ExecutionContext.global)

  val info = (msg: String) => println("OUT " + msg)
  val err = (msg: String) => println("ERR " + msg)

  def putStrlLn(value: String) = IO(println(value))
  val readLn = IO(scala.io.StdIn.readLine)

  def cmd(cmd: String, runId: String): IO[Int] = {

    IO.cancelable(
      (cb: (Either[Throwable, Int] => Unit)) => {
        val isCancelled = new AtomicBoolean(false)

        var process: Option[Process] = None

        val asyncResult = Future {
          import sys.process._
          info(s"${runId} Running `${cmd}`:")

          val log = ProcessLogger(
            (msg) => info(s"${runId}   ${msg}"),
            (msg) => err(s"${runId}   ${msg}")
          )

          val proc = Process(cmd)
          process = Some(proc.run(log))

          process.get.exitValue()
        }

        asyncResult.onComplete {
          case Success(value) => cb(Right(value))
          case Failure(e) => cb(Left(e))
        }

        IO {
          isCancelled.set(true)

          process match {
            case Some(process) => process.destroy()
            case None => {
              info("No process to cancel")
            }
          }

          info("# # # set isCancelled = true")
        }
      }
    )
  }

  def docker(id: String, params: String, runId: String) = cmd(s"docker run --rm -i ${params} ${id}", runId)

  def main(args: Array[String]): Unit = {
    val contextShift = IO.contextShift(global)

    import doobie._
    import doobie.implicits._

    val xa = Transactor.fromDriverManager[IO](
      "org.postgresql.Driver", "jdbc:postgresql://localhost:6432/postgres", "postgres", "pwd"
    )

    val program = for (
      runId <- FUUID.randomFUUID[IO];
      IO.race(
        docker("postgres", "-p 6432:5432 -e POSTGRES_PASSWORD=pwd", runId.show),
        IO.sleep(5 seconds) *> sql"select 42".query[Int].unique.transact(xa).flatMap(
          x => IO({
            println(x)
          })
        )
      )(contextShift) *>
      IO({
        println("success")
      })) yield program

Leave a Reply

Your email address will not be published. Required fields are marked *