It’s bad to use vars to define state that’s going to be shared by multiple threads, the same
goes for using vars for using vars for sharing state amongst multiple effects.

object Refs extends IOApp {

  var counter = 0
  def tickingClocks(name : String) : IO[Unit] = {
    for {
      _ <- IO(println(s"[$name] : ${System.currentTimeMillis()}"))
      _ =  counter = counter + 1
      _ <- IO.sleep(1 second)
      _ <- tickingClocks(name)
    } yield ()
  }

  def printCounter() : IO[Unit] =
    for {
      _ <- IO(println(s"counter is $counter"))
      _ <- IO.sleep(1 second)
      _ <- printCounter()
    } yield ()

  val program : IO[ExitCode] = (tickingClocks("first clock"), tickingClocks("second clock"), printCounter()).parTupled.as(ExitCode.Success)

  override def run(args: List[String]): IO[ExitCode] = program
}

Running the above code will result in lost updates as the counter will never be accurate.

Although there are Atomic classes to solve these problems, they are not functional structures. Cats effect provides a functional wrapper around Atomic Classes called Ref .It has API’s similar to Atomic Classes.

We can modify our example to use a Ref and guarantee that state is safely shared amongst effects and ultimately threads.

 def tickingClocks(name : String, counter : Ref[IO,Long]) : IO[Unit] = {
      for {
        _ <- IO(println(s"[$name] : ${System.currentTimeMillis()}"))
        _ <- counter.update(_ + 1)
        _ <- IO.sleep(1 second)
        _ <- tickingClocks(name,counter)
      } yield ()
    }

    def printCounter(counter : Ref[IO,Long]) : IO[Unit] =
      for {
        counterValue <- counter.get
        _            <- IO(println(s"counter is $counterValue"))
        _            <- IO.sleep(1 second)
        _            <- printCounter(counter)
      } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    for {
      ref <- Ref[IO].of(0L)
      _   <- (tickingClocks("first clock",ref), tickingClocks("second clock",ref), printCounter(ref)).parTupled.as(ExitCode.Success)
    } yield ExitCode.Success

Deferred

Now, let’s imagine we want to alert the user when the counter hits 13. One way to do this is to check for the value each time we update the counter. A better way to do this is by the use of the Deferred data type.

Deffered is a functional concurrency construct that sleeps or halts execution of subsequent effect until the task or value that was deferred completes. We can think of it as a Promise in Scala, that is completed elsewhere, could be another effect or effectively, another thread.

Deferred gives us the ability to serialize the execution of an effect with respect to some newly-produced state

By using Deferred, we can separate the task to run upon completion from the task that completes the Deferred.

In our previous example, we’ll use the Deferred data type to alert us when the counter reaches 13, that will be separate from other tasks.

    def tickingClocks(name : String, counter : Ref[IO,Long], alerter : Deferred[IO,Unit]) : IO[Unit] = {
      for {
        _            <- IO.sleep(1 second)
        _            <- IO(println(s"[$name] : ${System.currentTimeMillis()}"))
        counterValue <- counter.updateAndGet(_ + 1)
        _            <- if (counterValue >= 13) alerter.complete(()).attempt.void else IO.unit
        _            <- tickingClocks(name,counter,alerter)
      } yield ()
    }

  def alertIf13(is13 : Deferred[IO,Unit]) : IO[Unit] = {
    for {
      _ <- is13.get
      _ <- IO(println("ALERT!!!!!!!"))
    } yield ()
  }

    def printCounter(counter : Ref[IO,Long]) : IO[Unit] =
      for {
        counterValue <- counter.get
        _            <- IO(println(s"counter is $counterValue"))
        _            <- IO.sleep(1 second)
        _            <- printCounter(counter)
      } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    for {
      ref     <- Ref[IO].of(0L)
      alerter <- Deferred[IO,Unit]
          _   <- (
            tickingClocks("first clock",ref, alerter), 
            tickingClocks("second clock",ref, alerter),
            alertIf13(alerter), 
            printCounter(ref)
            ).parTupled
    } yield ExitCode.Success

In this example, we complete the Deferred with Unit in the tickingClocks while we use it in the alertIf13 function.

We used an IO#attempt on the call to complete the Deferred because our tickingClocks function is recursive and will attempt to call complete on an already completed Deferred which will lead to an IllegalStateException

Concurrent State Machines

An example used by the book was to design a functional Countdown latch using these functional concurrency structures.

To adapt to the parallel ticking clock example, here’s how the latch could work:

def run(args: List[String]): IO[ExitCode] =
    for {
      latch <- CountdownLatch(13)
      _     <- (
            tickingClocks("first clock", latch),
            tickingClocks("second clock", latch),
            alertIf13(latch),
            ).parTupled
    } yield ExitCode.Success

Now, this was my initial implementation which didn’t work because for some reason, calling complete on the Deferred didn’t work:

object CountdownLatch {
    def apply(n : Int) : IO[CountdownLatch] = {
      require(n > 0 , "number of latches should be greater than 0")
      for {
        latchSignal <- Deferred[IO,Unit]
        latchState   <- Ref[IO].of[LatchState](CountingDown(n))
      } yield new CountdownLatch {
        override def await(): IO[Unit] =
          latchSignal.get

        override def decrement(): IO[Unit] = {
          latchState.update {
            case CountingDown(1) =>
              // last latch
              latchSignal.complete(())
              Done

            case res @CountingDown(count) =>
              res.copy(count - 1)

            case Done =>
              Done
          }
        }
      }
    }
  }

After a long time spent debugging the decrement function, I saw what the problem was, it was this line:

case CountingDown(1) => // last latch latchSignal.complete(()) Done

I had totally forgotten that we were dealing with IO values, so to fulfil the update method function signature A => A, I added a Done after the latchSignal.complete(()). But that method isn’t going to be run because it’s an IO value.

The IO runtime won’t run it because it wasn’t returned, the IO computation as expected was just a description of computation and not the execution of the actual computation.

Now, after figuring this out, here was my next implementation for the decrement function:

override def decrement(): IO[Unit] = {
  latchState.updateAndGet {
    case CountingDown(1) | Done =>
      // last latch
      Done

    case res @CountingDown(count) =>
      res.copy(count - 1)
  }.flatMap {
    case Done =>
      // used attempt because of multiple calls to attempt
      latchSignal.complete(()).attempt.void
    case _ => IO.unit
  }
}

This worked, but i realized that since the ticking clock was continuous, the complete method of the latch was being called multiple times, throwing an error.

Then I remembered there was the modify method on the Ref that enabled returning some other state B and using that method seemed to totally solve the problem as seen below:

object CountdownLatch {
    def apply(n: Int): IO[CountdownLatch] = {
      require(n > 0, "number of latches should be greater than 0")
      for {
        latchSignal <- Deferred[IO, Unit]
        latchState <- Ref[IO].of[LatchState](CountingDown(n))
      } yield new CountdownLatch {
        override def await(): IO[Unit] =
          latchSignal.get

        override def decrement(): IO[Unit] = {
          latchState.modify {
            case CountingDown(1) =>
                // last latch
                Done -> latchSignal.complete(())

            case res@CountingDown(count) =>
              res.copy(count - 1) -> IO.unit

            case Done =>
              Done -> IO.unit

          }.flatten
        }
      }
    }
  }

which coincidentally was similar to the answer in the book :)