Skip to content

Commit c194ff9

Browse files
Add Lock to std #3542
1 parent 3e17905 commit c194ff9

File tree

2 files changed

+519
-0
lines changed

2 files changed

+519
-0
lines changed
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
/*
2+
* Copyright 2020-2025 Typelevel
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package cats
18+
package effect
19+
package std
20+
21+
import cats.effect.kernel.*
22+
import cats.effect.kernel.syntax.all.*
23+
import cats.syntax.all.*
24+
25+
import scala.collection.immutable.{Queue => Q}
26+
27+
abstract class Lock[F[_]] { self =>
28+
def shared: Resource[F, Unit]
29+
def exclusive: Resource[F, Unit]
30+
def tryShared: Resource[F, Boolean]
31+
def tryExclusive: Resource[F, Boolean]
32+
def mapK[G[_]](fk: F ~> G)(implicit F: MonadCancel[F, ?], G: MonadCancel[G, ?]): Lock[G] =
33+
new Lock[G] {
34+
override def shared: Resource[G, Unit] = self.shared.mapK(fk)
35+
override def exclusive: Resource[G, Unit] = self.exclusive.mapK(fk)
36+
override def tryShared: Resource[G, Boolean] = self.tryShared.mapK(fk)
37+
override def tryExclusive: Resource[G, Boolean] = self.tryExclusive.mapK(fk)
38+
}
39+
}
40+
object Lock {
41+
42+
/**
43+
* <p>A simple [[Lock]] built on top of [[Semaphore]].
44+
*
45+
* <p>This implementation does '''not''' guarantee writer-priority. Readers can starve writers
46+
* under high contention.
47+
*/
48+
def simple[F[_]](maxShared: Long)(implicit F: GenConcurrent[F, ?]): F[Lock[F]] =
49+
Semaphore[F](maxShared).map { semaphore =>
50+
new Lock[F] {
51+
override def shared: Resource[F, Unit] = semaphore.permit
52+
53+
override def exclusive: Resource[F, Unit] =
54+
Resource.makeFull((poll: Poll[F]) => poll(semaphore.acquireN(maxShared)))(_ =>
55+
semaphore.releaseN(maxShared))
56+
57+
override def tryShared: Resource[F, Boolean] =
58+
Resource.make(semaphore.tryAcquire) {
59+
case true => semaphore.release
60+
case false => F.unit
61+
}
62+
63+
override def tryExclusive: Resource[F, Boolean] =
64+
Resource.make(semaphore.tryAcquireN(maxShared)) {
65+
case true => semaphore.releaseN(maxShared)
66+
case false => F.unit
67+
}
68+
}
69+
}
70+
71+
def apply[F[_]](implicit F: GenConcurrent[F, ?]): F[Lock[F]] =
72+
apply(IdentityProvider.unique)
73+
74+
/**
75+
* <p>A cancellation-safe [[Lock]] implementation that is reentrancy-ready and
76+
* writer-preferential.
77+
*
78+
* <p>This variant uses a user-supplied identity `K` (with [[Eq]]) to enable reentrant
79+
* behavior: repeated acquisitions using the same identity will succeed without blocking.
80+
*
81+
* <p>'''Note''': true fiber-based reentrancy is only possible in [[IO]] via [[IOLocal]],
82+
* where identity can be safely tied to a [[Fiber]].
83+
*/
84+
85+
def apply[F[_], K: Eq](identityProvider: IdentityProvider[F, K])(
86+
implicit F: GenConcurrent[F, ?]): F[Lock[F]] = {
87+
Ref[F].of(State(None, Q.empty[Claim[F, K]], Q.empty[Claim[F, K]])).map { ref =>
88+
new Lock[F] {
89+
override def shared: Resource[F, Unit] =
90+
Claim.resource(identityProvider).flatMap { claim =>
91+
def acquire: F[Unit] = F.uncancelable { poll =>
92+
ref
93+
.modify {
94+
case State(None, _, _) =>
95+
State(Current.Shared(Q(claim)).some, Q.empty, Q.empty) -> Right(())
96+
97+
// Reentrant access
98+
case s @ State(Some(Current.Shared(running)), _, _)
99+
if running.exists(_.identity === claim.identity) =>
100+
s -> Right(())
101+
102+
// Reentrant access
103+
case s @ State(Some(Current.Exclusive(running)), _, _)
104+
if running.identity === claim.identity =>
105+
s -> Right(())
106+
107+
case State(Some(Current.Shared(running)), exclQueue, _)
108+
if exclQueue.isEmpty =>
109+
State(
110+
Current.Shared(running.enqueue(claim)).some,
111+
Q.empty,
112+
Q.empty) -> Right(())
113+
114+
case State(Some(Current.Shared(running)), exclQueue, shrdQueue) =>
115+
State(
116+
Current.Shared(running).some,
117+
exclQueue,
118+
shrdQueue.enqueue(claim)) -> Left(claim)
119+
120+
case State(Some(Current.Exclusive(running)), exclQueue, shrdQueue) =>
121+
State(
122+
Current.Exclusive(running).some,
123+
exclQueue,
124+
shrdQueue.enqueue(claim)) -> Left(claim)
125+
}
126+
.flatMap {
127+
case Right(_) => F.unit
128+
case Left(_) =>
129+
poll(claim.await).onCancel {
130+
ref.update {
131+
case State(curr, exclQueue, shrdQueue) =>
132+
State(
133+
curr,
134+
exclQueue,
135+
shrdQueue.filterNot(_.identity === claim.identity))
136+
}
137+
}
138+
}
139+
}
140+
141+
Resource.make(acquire)(_ => unlockShrd(ref, claim.identity))
142+
}
143+
144+
override def exclusive: Resource[F, Unit] = {
145+
Claim.resource(identityProvider).flatMap { claim =>
146+
def acquire: F[Unit] = F.uncancelable { poll =>
147+
ref
148+
.modify {
149+
case State(None, _, _) =>
150+
State(Current.Exclusive(claim).some, Q.empty, Q.empty) -> Right(())
151+
152+
// Reentrant access
153+
case state @ State(Some(Current.Exclusive(running)), _, _)
154+
if running.identity === claim.identity =>
155+
state -> Right(())
156+
157+
case State(Some(Current.Exclusive(running)), exclQueue, shrdQueue) =>
158+
State(
159+
Some(Current.Exclusive(running)),
160+
exclQueue.enqueue(claim),
161+
shrdQueue) -> Left(claim)
162+
163+
case State(Some(Current.Shared(running)), exclQueue, shrdQueue) =>
164+
State(
165+
Some(Current.Shared(running)),
166+
exclQueue.enqueue(claim),
167+
shrdQueue) -> Left(claim)
168+
}
169+
.flatMap {
170+
case Right(_) => F.unit
171+
case Left(_) =>
172+
poll(claim.await).onCancel {
173+
ref.update {
174+
case State(curr, exclQueue, shrdQueue) =>
175+
State(
176+
curr,
177+
exclQueue.filterNot(_.identity === claim.identity),
178+
shrdQueue)
179+
}
180+
}
181+
}
182+
}
183+
184+
Resource.make(acquire)(_ => unlockExcl(ref, claim.identity))
185+
}
186+
}
187+
188+
override def tryShared: Resource[F, Boolean] =
189+
Claim.resource(identityProvider).flatMap { claim =>
190+
def acquire: F[Boolean] = F.uncancelable { _ =>
191+
ref.modify {
192+
case State(None, _, _) =>
193+
State(Current.Shared(Q(claim)).some, Q.empty, Q.empty) -> true
194+
195+
// Reentrant access: already holds shared or exclusive lock
196+
case s @ State(Some(Current.Shared(running)), _, _)
197+
if running.exists(_.identity === claim.identity) =>
198+
s -> true
199+
200+
case s @ State(Some(Current.Exclusive(running)), _, _)
201+
if running.identity === claim.identity =>
202+
s -> true
203+
204+
case State(Some(Current.Shared(running)), exclQueue, _) if exclQueue.isEmpty =>
205+
State(Current.Shared(running.enqueue(claim)).some, Q.empty, Q.empty) -> true
206+
207+
case State(Some(Current.Shared(running)), exclQueue, shrdQueue) =>
208+
State(
209+
Current.Shared(running).some,
210+
exclQueue,
211+
shrdQueue.enqueue(claim)) -> false
212+
213+
case State(Some(Current.Exclusive(running)), exclQueue, shrdQueue) =>
214+
State(
215+
Current.Exclusive(running).some,
216+
exclQueue,
217+
shrdQueue.enqueue(claim)) -> false
218+
}
219+
}
220+
221+
Resource.make(acquire) {
222+
case true => unlockShrd(ref, claim.identity)
223+
case false => F.unit
224+
}
225+
}
226+
227+
override def tryExclusive: Resource[F, Boolean] =
228+
Claim.resource(identityProvider).flatMap { claim =>
229+
def acquire: F[Boolean] = F.uncancelable { _ =>
230+
ref.modify {
231+
case State(None, _, _) =>
232+
State(Current.Exclusive(claim).some, Q.empty, Q.empty) -> true
233+
234+
case state @ State(Some(Current.Exclusive(running)), _, _)
235+
if running.identity === claim.identity =>
236+
state -> true
237+
238+
case State(Some(Current.Exclusive(running)), exclQueue, shrdQueue) =>
239+
State(
240+
Current.Exclusive(running).some,
241+
exclQueue.enqueue(claim),
242+
shrdQueue) -> false
243+
244+
case State(Some(Current.Shared(running)), exclQueue, shrdQueue) =>
245+
State(
246+
Current.Shared(running).some,
247+
exclQueue.enqueue(claim),
248+
shrdQueue) -> false
249+
}
250+
}
251+
252+
Resource.make(acquire) {
253+
case true => unlockExcl(ref, claim.identity)
254+
case false => F.unit
255+
}
256+
}
257+
}
258+
}
259+
}
260+
261+
private def unlockShrd[F[_], K: Eq](ref: Ref[F, State[F, K]], identity: K)(
262+
implicit F: GenConcurrent[F, ?]): F[Unit] =
263+
ref.modify {
264+
case State(Some(Current.Shared(running)), exclQueue, shrdQueue)
265+
if running.nonEmpty && running.head.identity === identity =>
266+
val (_, remaining) = running.dequeue
267+
268+
if (remaining.nonEmpty) {
269+
State(Some(Current.Shared(remaining)), exclQueue, shrdQueue) -> F.unit
270+
} else {
271+
exclQueue.dequeueOption match {
272+
case Some((next, rest)) =>
273+
State(Some(Current.Exclusive(next)), rest, shrdQueue) -> next.complete.void
274+
275+
case None if shrdQueue.nonEmpty =>
276+
State(Some(Current.Shared(shrdQueue)), Q.empty, Q.empty) ->
277+
shrdQueue.toList.traverse_(_.complete)
278+
279+
case None =>
280+
State(None, Q.empty[Claim[F, K]], Q.empty[Claim[F, K]]) -> F.unit
281+
}
282+
}
283+
case state => state -> F.unit
284+
}.flatten
285+
286+
private def unlockExcl[F[_], K: Eq](ref: Ref[F, State[F, K]], identity: K)(
287+
implicit F: GenConcurrent[F, ?]): F[Unit] = {
288+
ref.modify {
289+
case State(Some(Current.Exclusive(current)), exclQueue, shrdQueue)
290+
if current.identity === identity =>
291+
exclQueue.dequeueOption match {
292+
case Some((next, rest)) =>
293+
State(Some(Current.Exclusive(next)), rest, shrdQueue) -> next.complete.void
294+
295+
case None if shrdQueue.nonEmpty =>
296+
State(Some(Current.Shared(shrdQueue)), Q.empty, Q.empty) ->
297+
shrdQueue.toList.traverse_(_.complete)
298+
299+
case None =>
300+
State(None, Q.empty[Claim[F, K]], Q.empty[Claim[F, K]]) -> F.unit
301+
}
302+
303+
case state => state -> F.unit
304+
}.flatten
305+
}
306+
307+
private case class Claim[F[_], K](gate: Deferred[F, Unit], identity: K) {
308+
def await: F[Unit] = gate.get
309+
def complete: F[Boolean] = gate.complete(())
310+
}
311+
private object Claim {
312+
def resource[F[_], K](identityProvider: IdentityProvider[F, K])(
313+
implicit F: GenConcurrent[F, ?]): Resource[F, Claim[F, K]] =
314+
Resource.eval(identityProvider.next.flatMap(Claim[F, K](_)))
315+
316+
def apply[F[_], K](identity: => K)(implicit F: GenConcurrent[F, ?]): F[Claim[F, K]] =
317+
Deferred[F, Unit].map(Claim(_, identity))
318+
}
319+
320+
trait IdentityProvider[F[_], K] {
321+
def next: F[K]
322+
}
323+
324+
object IdentityProvider {
325+
326+
def constant[F[_]: Applicative, K](k: K): IdentityProvider[F, K] =
327+
new IdentityProvider[F, K] {
328+
def next: F[K] = k.pure[F]
329+
}
330+
331+
def fromFunction[F[_]: Sync, K](fn: () => K): IdentityProvider[F, K] =
332+
new IdentityProvider[F, K] {
333+
def next: F[K] = Sync[F].delay(fn())
334+
}
335+
336+
def fromEffect[F[_], K](fk: F[K]): IdentityProvider[F, K] =
337+
new IdentityProvider[F, K] {
338+
def next: F[K] = fk
339+
}
340+
341+
def unique[F[_]: Unique]: IdentityProvider[F, Unique.Token] =
342+
new IdentityProvider[F, Unique.Token] {
343+
def next: F[Unique.Token] = Unique[F].unique
344+
}
345+
}
346+
347+
private sealed trait Current[F[_], K]
348+
private object Current {
349+
case class Shared[F[_], K](running: Q[Claim[F, K]]) extends Current[F, K]
350+
case class Exclusive[F[_], K](running: Claim[F, K]) extends Current[F, K]
351+
}
352+
353+
private case class State[F[_], K](
354+
current: Option[Current[F, K]],
355+
exclQueue: Q[Claim[F, K]],
356+
shrdQueue: Q[Claim[F, K]])
357+
}

0 commit comments

Comments
 (0)