Skip to content
This repository was archived by the owner on Oct 19, 2024. It is now read-only.
This repository was archived by the owner on Oct 19, 2024. It is now read-only.

Support gRPC server stream cancellation #334

Open
@fintara

Description

@fintara

Here is an example - a stream of numbers each second which should stop when the client stops listening.

syntax = "proto3";
package arithmetic;

service Arithmetic {
  rpc Range (Empty) returns (stream OneInt) {}
}

message Empty {}

message OneInt {
  int32 value = 1;
}
delay :: (MonadIO m) => Int -> ConduitT i i m ()
delay t = awaitForever $ \i -> liftIO (threadDelay t) >> yield i

range :: (MonadServer m) => Empty -> ConduitT OneInt Void m () -> m ()
range _ sink = runConduit $ source 1 .| delay 1_000_000 .| filterC good .| mapC OneInt .| sink
  where
    good x = x <= 5 || x >= 15
    source !x = do
      liftIO $ putStrLn $ "Request " <> show x
      yield x
      source (x + 1)

data Empty = Empty
  deriving (Eq, Show, Generic)
  deriving (ToSchema ArithmeticSchema "Empty", FromSchema ArithmeticSchema "Empty")

newtype OneInt
  = OneInt { value :: Int32 }
  deriving (Eq, Show, Generic)
  deriving (ToSchema ArithmeticSchema "OneInt", FromSchema ArithmeticSchema "OneInt")

What you will notice is that if the client stops listening (Ctrl+C) between 5 and 15, the stream will continue to work until it hits yield 16. This means that the stream can potentially run forever.

Essentially, I would like to know when the client has stopped listening (for any reason) - like here for Python, so that I know when to stop iterating (or better yet, it's done automatically).

Any ideas how to achieve this?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions