Concurrent Coordination
We can get quite far with parMapN
and friends, but complex concurrent programs require coordination between different parts that extends beyond returning values. The Cats Effects standard library provides useful tools for communication between concurrent processes. There is also Ref and Deferred, which are part of the kernel
and the most basic tools on which many others are built.
Creating Concurrent Tools
We'll use Ref
as an example of a concurrent tool. All the others work in the same way.
The simplest way to create a Ref
is to use IO.ref
.
val ref: IO[Ref[IO, Int]] = IO.ref(1)
The type looks a bit complicated. Unpacking it we have:
- an
IO[Stuff]
, meaning anIO
that producesStuff
when run; and Stuff
isRef[IO, Int]
, meaning aRef
that stores anInt
and works withIO
.
You'll have to get used to these kind of types when using Cats Effect.
We can also construct a Ref
by calling the apply
method on the companion object. In this case we have to specify the effect type (which is always IO
, for us) to help out type inference.
val ref2 = Ref[IO].of(1)
We could also write out the full type, as below, but this quickly gets tedious.
val ref3: IO[Ref[IO, Int]] = Ref.of(1)
Exercise: Putting Tools to Use
Complete the challenge in code/src/main/scala/parallelism/02-tools.scala
, which gets you to use some of the tools provided by Cats Effect.
-
This exercise is focusing on the difference between description and action. The code in
first
uses a description twice, so it gets two differentRefs
. The code insecond
uses the sameRef
twice, which is usually what you want. -
The following code will do the job.
def generate(ref: Ref[IO, Int]) = smallRandomSleep
.map(_ => random.nextInt(10))
.flatMap(v => ref.getAndUpdate(a => a + v))
.replicateA_(100)
def collector(ref: Ref[IO, Int]) =
IO.sleep(1.second)
.flatMap(_ => ref.get)
.flatMap(v => IO.println(s"Value is $v"))
val run =
ref.flatMap { r =>
(
generate(r),
generate(r),
generate(r),
generate(r),
generate(r),
collector(r)
).parTupled.void
}