There and Back Again: Turning Haskell functions into RPC calls- Part 1
This post follows from One Serialization Class to Rule Them All.
While developing a client library for Temporal, I found that client libraries in the officially supported languages (Go, Java, PHP, TypeScript, .NET, and Python) all support turning native functions into Workflow and Activity invocations. For example, in TypeScript, you can write a workflow like this, and it can be invoked remotely by the Temporal server
type ExampleArgs = {
name: string;
};
export async function example(
args: ExampleArgs,
): Promise<{ greeting: string }> {
const greeting = await greet(args.name);
return { greeting };
}
This is a very convenient way to write code that can be invoked remotely, as it's just plain code! I wanted the experience of writing Temporal workflows in Haskell
to retain the spirit of being as low-boilerplate as possible, so I set out to see if I could do the same thing in Haskell. In order to support invoking arbitrary functions
from a remote source, we need to be able to serialize and deserialize the function arguments and results. In the previous post, we saw how to use a the Codec
type class to
serialize a value using the optimal codec. We'll leverage that here to serialize and deserialize function arguments and results.
So, how can we turn a function into a version of itself that can be invoked via its serialized arguments? Let's start with a sample echo
function:
echo :: String -> IO String
echo = pure
Firstly, we need some way to represent a stable reference to this function. Past work on things like Cloud Haskell led to the creation of StaticPtr
,
which is at face values seems like pretty much what we would need. However, StaticPtr
only works when communicating processes that are running the same build:
The set of keys used for locating static pointers in the Static Pointer Table is not guaranteed to remain stable for different program binaries. Or in other words, only processes launched from the same program binary are guaranteed to use the same set of keys.
In real world systems, we have to do things like perform zero-downtime deployments where we have multiple revisions of a system running concurrently, so that's not really going to work for us. For now, let's make our own reference type:
data RpcRef (args :: [Type]) result = RpcRef
{ rpcRefName :: Text
}
For now, we'll just keep track of the argument types and result type at the type level. We'll give the reference a name that we come up with ourselves– it doesn't matter too much for now.
Next, we need a way to capture the types of the arguments and result of a function. A type family will let us pattern match on one argument application at a time:
type family ArgsOf f where
ArgsOf (arg -> rest) = arg ': ArgsOf rest
ArgsOf result = '[]
Here, we "pattern match" on the type of f
, and if it's a function arrow, we add the argument type to the list of argument types. If it's anything but a function arrow,
then we know there aren't any args left.
We'll do something similar with the result type. One thing that's a little different here is that from an RPC call perspective, we don't really need to care about the monad
or functor that the function is operating in when we invoke the function. Obviously, the remote service needs to care about that, but it's not interesting for us. So we'll
pass the parameter m
to the type family, and if the function is operating in the correct monad, we'll return the result type. If it's not, we'll return a type error.1
type family ResultOf (m :: Type -> Type) f where
ResultOf m (arg -> rest) = ResultOf m rest
ResultOf m (m result) = result
ResultOf m result = TypeError ('Text "This function must use the (" ':<>: 'ShowType m ':<>: 'Text ") monad." :$$: ('Text "Current type: " ':<>: 'ShowType result))
Cool, now we can turn a function into a reference to itself:
rpcRef :: Text -> f -> RpcRef (ArgsOf f) (ResultOf IO f)
rpcRef name _ = RpcRef name
Wait, you might be thinking, we can't actually do anything with this reference besides get a name out of it. And you'd be right!
In order to support invoking the function and returning the result, we will need to a serialization codec of some sort that all of the
parameters and result can be serialized with. We'll use the Codec
type class from the previous post to do this. We also need some way to
actually gather all of the arguments in order to encode
them. It would also be nice if we could pass the arguments in using a standard Haskell
function application– we don't want the user to have to do anything special to invoke the function.
Let's define another type family that can expand the args and result back into a function:
type family (:->:) (args :: [Type]) (result :: Type) where
(:->:) '[] result = result
(:->:) (arg ': args) result = arg -> (args :->: result)
Now, when we can define what the ideal function invocation would look like– we'd like a reference to provides all of the serialization codec logic behind a nice façade, so that we can just pass in the arguments and get a result back.
runActivity :: RpcRef args result -> (args :->: IO result)
From our previous post on serialization, we have a type class Codec
that we use to encode and decode values into RawPayload
s. We'll use that
here to gather up our arguments
-- Things we're using from the previous post.
class Codec fmt a where
...
decode :: RawPayload -> Either String a
encode :: forall fmt a. Codec fmt a => fmt -> a -> RawPayload
data RawPayload = RawPayload
{ inputPayloadData :: ByteString
, inputPayloadMetadata :: Map Text ByteString
} deriving (Eq, Show)
We'll use the fact that args
is a type-level list of Type
to our advantage here. Each argument in the list
must satisfy Codec codec arg
, so we return a function that takes an argument of the correct type, encodes it,
adds it to the list of arguments, and recurses to do the same thing to the rest of the arguments. Once the list is
empty, we call f
, which will be the function that's in charge of doing the actual IO work to make network calls
or whatever for RPC purposes.
class GatherArgs codec (args :: [Type]) where
gatherArgs
:: Proxy args
-> codec
-> ([RawPayload] -> [RawPayload])
-> ([RawPayload] -> result)
-> (args :->: result)
instance (Codec codec arg, GatherArgs codec args) => GatherArgs codec (arg ': args) where
gatherArgs _ c accum f = \arg ->
gatherArgs
(Proxy @args)
c
(accum . (encode c arg :))
f
instance GatherArgs codec '[] where
gatherArgs _ _ accum f = f $ accum []
Let's update our RpcRef
to carry the type class dictionary that the compiler needs to actually
serialize and deserialize things! rpcRef
also needs to take a Codec
as an argument now.
data RpcRef (args :: [Type]) result = forall codec.
( Codec codec result
, GatherArgs codec args
) => RpcRef
{ rpcRefName :: Text
, rpcRefCodec :: codec
}
rpcRef :: Text -> codec -> f -> RpcRef (ArgsOf f) (ResultOf IO f)
rpcRef name fmt = RpcRef name fmt
Now we can send and receive calls over the wire! This is a contrived example, but it shows how we use
gatherArgs
in practice.
runActivity :: RpcRef args result -> (args :->: IO result)
runActivity (RpcRef _ codec) = gatherArgs (Proxy @args) codec id $ \capturedPayloads -> do
-- Do something with the payloads. I'm just making this bit up.
resultBody <- post "https://foo.bar/rpc" $ Data.Binary.encode capturedPayloads
case Data.Binary.decode resultBody of
Left err -> throwIO $ RpcBodyDecodeError err
Right result -> case decode codec result of
Left err -> throwIO $ RpcResultDecodeError err
Right result -> pure result
echoRef :: RpcRef '[String] String
echoRef = rpcRef "echo" (Codec @JSON)
echoFromFarAway :: String -> IO String
echoFromFarAway = runActivity echoRef
If we call echoFromFarAway "hello"
, we'll get back a String
that says "hello"
.
In the next post, we'll look at how we handle the server side of things! Part 2
Footnotes
-
It might be tempting to remove the
m
parameter fromResultOf
and do something likeResultOf (m a) = a
, but if you use something likeReaderT env
, this introduces some gnarly impredicativity issues.m
serves as a type witness to work around it. ↩