temporalio/documentation

[Support] Add or update grpc headers

Closed this issue · 32 comments

Hello, for this part: https://docs.temporal.io/security?lang=typescript#authorize-api-calls
image
Did this part any updates? Is there any way to update the headers of grpc requests by Typescript sdk?

To set static headers use https://typescript.temporal.io/api/interfaces/client.ConnectionOptions#metadata and https://typescript.temporal.io/api/interfaces/worker.NativeConnectionOptions#metadata.

Client and Connection objects expose a withMetadata helper that can be used to set metadata in an async context.

NativeConnection exposes setMetadata which can be used to dynamically update the gRPC headers.

To set static headers use https://typescript.temporal.io/api/interfaces/client.ConnectionOptions#metadata and https://typescript.temporal.io/api/interfaces/worker.NativeConnectionOptions#metadata.

Client and Connection objects expose a withMetadata helper that can be used to set metadata in an async context.

NativeConnection exposes setMetadata which can be used to dynamically update the gRPC headers.

Thanks @bergundy. But the setMetadata is not a static method of NativeConnection, it need a instance of NativeConnection first. And it is need to await to get the connection. So is this singleton design? The connection will be same instance if the parameters of await NativeConnection function are the same.

export async function getConnect() {
  const connection = await NativeConnection.connect({
    address:"",
    tls: {
    },
  });
  return connection
}

Call the function above can get the singleton connection in different activities? And then connection .setMetaData() to change the grpc header?

Call the function above can get the singleton connection in different activities?

NativeConnection is only used to create Workers. If activities needs to make client calls, then they need use a Connection object. NativeConnection and Connection are two distinct classes, used two different context, and you can't convert from one to the other.

it is need to await to get the connection. So is this singleton design? The connection will be same instance if the parameters of await NativeConnection function are the same.

The SDK itself does not maintain Connection/NativeConnection pool. Every call to NativeConnection.connect(...) or Connection.connect(...) effectively establish a new gRPC connection to the Temporal server.

There is a non-negligible cost to establishing new gRPC connections and maintaining them open, both from client side and server side. We therefore highly recommend that you establish as few connection as possible. Multiple Workers can reuse a same NativeConnection and multiple Client can reuse a same Connection.

Thanks so much, @mjameswh.
I know that NativeConnection is used for creating Workers, and can pass a metadata parameter to set the grpc header.
Then the grpc header can be changed by setMetadata. But this setMetadata function need to called by a NativeConnection instance.
The activities don't need to make client calls.
The activities(local activities used for Workflow inbound and outbound interceptors) want to change the grpc headers, how can I get the NativeConnection instance used for creating the worker?
For example,
The NativeConnection instance is in this file for creating worker
But if I want to change the grpc header in an activity file(Actually an local activity used in a intercepor for authtication).
How can I get the NativeConnection instance in the activities.ts file but the NativeConnection instance created in worker.ts file.

In short. How to get the NativeConnection instance in an activity execution? @mjameswh @bergundy, Thanks.

export class WorkflowAuthInterceptor
    implements WorkflowInboundCallsInterceptor {
    public async execute(
        input: WorkflowExecuteInput,
        next: Next<WorkflowInboundCallsInterceptor, 'execute'>
    ): Promise<unknown> {       
        console.log("Workflow Type", workflowInfo().workflowType);
        console.log("Workflow ID", workflowInfo().workflowId);
        await localActivity1(input.headers);
        localActivity2(workflowInfo().workflowType);
        return await next(input);
    }
}

How to get the NativeConnection instance in the localActivity1 or localActivity2 in this interceptor?

But if I want to change the grpc header in an activity file (Actually an local activity used in a intercepor for authtication).

Sorry for insisting, but that sounds like an unusual thing to do. Not saying that there are never use cases for this, but I just want to confirm that you are working in the right direction... Why do you need the Worker's connection to change its headers at runtime, from one of its own activities? Are you developing something to support dynamically starting/reconfiguring/stopping Workers from a single process?

How to get the NativeConnection instance in an activity execution?

The SDK itself provides no way to obtain the Worker's NativeConnection from either an interceptor or an activity. However, you may easily inject the connection by yourself into your activities.

For example, you could do something like this:

In worker.ts:

  const nativeConnection = NativeConnection.connect({ ... });

  const worker = await Worker.create({
    connection: nativeConnection,
    taskQueue: 'dependency-injection',
    workflowsPath: require.resolve('./workflows'),
    activities: createActivities(nativeConnection),
  });

In activities.ts:

  export function createActivities(nativeConnection: NativeConnection) {
    return {
      async function activity1() {
        nativeConnection.setMetadata(...);
      },

      async function activity2() {
      },   
    };
  })

Thanks, @mjameswh

Are you developing something to support dynamically starting/reconfiguring/stopping Workers from a single process?

No, I am not working on this. I am working on Authentication with the token on the grpc header.
On the worker side, I need to get information from the client side and request the Authentication token API with the information.
I am doing this by interceptors, but because of Deterministic constraints, I am using local activities in the interceptors to get tokens. Just as we discussed in this issue: temporalio/sdk-typescript#1135
After getting the token, I need to set it into the grpc header for authentication about activity execution.
That's why I need to add or update(token expires) the grpc header in a local activity.
So the local activities are called by the interceptors.
I think the interceptors used to handle the grpc headers are reasonable.

In activities.ts:

If the activities.ts is written like createActivities function as you showed and pass it to Worker creation options. How to use it in
a Workflow or a interceptor with proxyActivities or proxyLocalActivities?
I assume in the Workflow file the code will be like this:

import { createActivities } from './activities';
const activity = createActivities(nativeConnection) // How to get nativeConnection in this file?
const { activity1} = proxyActivities<typeof activity>({
  startToCloseTimeout: '1m',
  scheduleToCloseTimeout: '2m'
});

Is there anyway to pass the NativeConnection instance into the workflow or interceptor? I think it is better to get the NativeConnection instance there are then pass into into the activity execution.

Hello, @mjameswh
Any Suggestions about the issue above? Thanks.

Interceptors don't manipulate gRPC headers, they manipulate the persisted headers in workflow history like this one.

Interceptors in workflows are also subject to deterministic constraints and run inside the sandbox.

I'm still unclear on what it is you're trying to achieve. If you're trying to authenticate your worker, create a NativeConnection object with the options mentioned above and update the metadata dynamically when the token changes (also mentioned above). There's no need for activity code to access the NativeConnection object unless you're instantiating workers in your activities.

Thanks @bergundy.
Not just authenticate my worker, but also authenticate the grpc requests from the worker to execute activities.
So every time the worker needs to execute activities, a JWT token should be put into the grpc header.
And this Token is different from the token used for worker authentication.
The JWT token in the grpc header need to be got from a service. And the service should be requested with parameters from the persisted headers in workflow history like this one.
So the interceptors are used to get the parameters for token request.

Steps about I am doing for Temporal related autentication.

  1. Get data from the persisted headers in workflow history
  2. Ruquest a token service with the data above
  3. Put the token into the grpc header when executing activities
  4. The token may expire, so need to update it when expired.

Put the token into the grpc header when executing activities

You don't have this level of control from a workflow, you'll need to use the built-in Temporal headers to pass auth information to your activities, only the worker or client can authenticate with the server via gRPC headers.

Thanks @bergundy .
But when creating the worker, there is no enough data act as parameters to get the token.

I think you're approaching it the wrong way.

Workers require permissions to poll and complete activity and workflow tasks.
They can't operate without the ability to access these APIs.

Often, activities do not run in the same process as the workflow that started them, when a workflow task completes the worker sends a request to the server that may contain a list of commands, some of those commands may be requests to start activities. The server creates state machines to manage these activities and dispatches activity tasks to arbitrary workers to start processing these activities.
It's not the activity that needs to authenticate with Temporal, it's the worker running that activity.

Thanks so much @bergundy .
In my situation. When the worker polling the activities, it need to be authenticated.
When a Worker is created and registered, it need to be authenticated.
And the tokens used in these 2 situation are different.

They can't be different, a worker uses the token to poll on and complete tasks for activities and workflows. There's only one token that can be used per worker, there's no way to set gRPC headers per activity or workflow.

Thanks @bergundy .
The connection between a Worker and the Temporal server seems some kind of long-term connection?
If it needs a token when a Worker is created and registered, but the token will expire after 30 mins.
Need to Update the token for the Worker?

Yes, the connection may be kept alive for a long time. If you need to refresh the token use the https://typescript.temporal.io/api/classes/worker.NativeConnection#setmetadata method.

If you need to refresh the token use the https://typescript.temporal.io/api/classes/worker.NativeConnection#setmetadata method.

Hello, @bergundy .
If the token used for Worker creation need to be judged if it is expired or not on every poliing request.
Where should I use the setMetadata method, or is there any events to catch the polling result then I can use setMetadata method in the event handler.

If the worker cannot communicate with the server for over a minute it will shut itself down with an exception.
You should avoid this and handle token refresh externally, unfortunately this is a bit complicated but AFIACT, having a background routine in your program that periodically refreshes the token (e.g. every few minutes) and calls setMetadata is the simplest way you have today to get this working.

In the future, we'd like to power the worker with a pure JS connection (instead of the native Rust one), where you can inject grpc interceptors to handle this better.

dano commented

Hi @bergundy, I'm a co-worker of @YufeeXing. Let me provide a little more background on why he's trying to do this. Apologies, the explanation is a little bit long!

First a slight correction: he is working on authorizing Workflow and Activity executions, not authenticating them.

Now for the background: Our security team has asked us to do “scoped” authorization of each request to execute a Workflow or Activity. What I mean by "scoped authorization" is that we need to validate that every time a customer sends our application a request that results in Workflows/Activities being executed, the customer is actually authorized to execute each of those specific Workflows/Activities.

However, the way we've been asked to implement this is unusual:

  1. Intercept every API call to Temporal on the client side, determine if it's one we want to do scoped authorization for (all requests to execute workflows or activities), or one we need to do do "service-level" authorization for (PollWorkflowTaskQueue/PollActivityTaskQueue), and generate the appropriate authorization token for that type of request.
    • If it's a scoped authZ request
      • Gather metadata about the particular request (e.g. what workflow or activity are you trying to execute) and include that, along with the metadata about the currently logged in user, in a request sent to an authorization service.
      • The authorization service determines if the current user has the necessary permissions, and if it does, returns a signed authorization token to the interceptor.
    • If it's a service-level authZ request
      • Generate a signed authorization token using credentials unique to the Temporal worker making the request
  2. Add whichever token we generated to the gRPC headers (not the Temporal headers) we send to Temporal for that request
  3. Our Temporal Authorizer plugin validates that the token it receives in the gRPC header is valid.

When we initially attempted to implement this in Java, we observed that we were fighting against the flow Temporal wanted us to use, which was to let the Temporal authorizer plugin do the permissions check. Doing it on the client-side wasn't straightforward because the authorizer plugin expects the authorization token to be in a gRPC header, but gRPC interceptors have no access to the Temporal metadata we needed (e.g. the workflow/activity being executed), and the Temporal interceptors (which have access to the metadata) have no access to the gRPC headers.

For reasons I won't get into here (🙂) we were asked to make it work in spite of this. We implemented it in the Java SDK by using Temporal WorkflowClient/Worker Interceptors to intercept every workflow/activity call, called the authorization service in those, and stored the signed token we got back from the authorization service in a thread local (this is a slight simplification, but gets the idea across). The token is pulled out of the thread local by a gRPC interceptor, which adds it to the outgoing gRPC header for the request.

This is hacky, but the only way we could find to make this model work. I am not at all familiar with the Typescript SDK or Typescript in general, but it sounds like @YufeeXing can't find any way to replicate what we did with Java, possibly because there isn't any concept of a gRPC interceptor (and perhaps also because of the threading model used in Typescript). I think this is why @YufeeXing was looking for a way to get access to the NativeConnection instance in Workflow/Activity interceptors.

Can you see any way to make this work, or do we have to go back to our security team and ask them to change their requirements?

Hello, @bergundy . Thanks so much for so many explanations and suggestions above for me.
Just as @dano described. 2 types of authZ request are here and there should be a gRPC header with the same key for authorizing these 2 types of request.
Any ideas or suggestions about this?

I will find some time to properly respond to this.

What I've seen other companies do is the following:

  1. A client starts a workflow using a token passed via either gRPC or Temporal headers
  2. A custom authorizer on the server translates that token into claims and stores them in the Temporal headers on StartWorkflowExecutionRequest which in turn gets stored in the WorkflowExecutionStarted history event, it's important not to store the token here so it can't be reused for other requests or alternatively use a signed non-replayable token
  3. User code on the worker intercepts workflow start, extracts the claims from Temporal headers, and authorizes the execution
  4. The Temporal claim headers are propagated to child workflows and activities to continue authorization upstream

The code for the interceptor in (3) might look something like this:

export class AuthorizationInterceptor implements WorkflowInboundCallsInterceptor {
  async execute(input, next) {
    const encodedClaims = input.headers.claims;
    const claims = payloadConverter.fromPayload(encodedClaims);
    if (!claims.isAuthorizedForSomeSpecificOperation) {
      throw ApplicationFailure.nonRetryable('Unauthorized');
    }
    return next(input);
  } 
}

Note that you will likely have to handle signals, queries, and updates as well.

Apologies for the late response, I was on PTO.

Thanks @bergundy . But there are some differences from my side.

In our use case, we want the gRPC headers to contain different token for the workflow execution request and the activity execution request [The activity called by the workflow]

So I am trying to use the WorkflowInboundCallsInterceptor to change the token in the gRPC header instead of authorizing the executions now.

The activity execution request is generated on workflow task completion via the ScheduleActivityTask command.

If your use case requires separately authorizing each activity execution, you might want to add Temporal headers via WorkflowOutboundCallsInterceptor.scheduleActivity with the desired credentials, note that these headers are recorded in workflow history, do not store any reusable auth information there.

So, the suggestion is add the token into the Temporal headers via WorkflowOutboundCallsInterceptor? @bergundy

Yes

dano commented

@bergundy According to the docs, a ClaimMapper plugin's AuthInfo input parameter is populated using the gRPC "authorization" header:

The Temporal Server expects an authorization gRPC header with an authorization token to be passed with API calls if requests authorization is configured.

It sounds like we'd have to bypass the ClaimMapper plugin altogether, and have the Authorizer pull the Temporal header out of the request via authorization.CallTarget.Request, correct? I'm not familiar with the ClaimMapper/Authorizer plugins, so I apologize if this is a very basic question.

You would likely want the worker that executes the activity to authorize running it, the authorizer on the server does not interpret commands only gRPC requests.

Hi @bergundy, I am working with @dano and @YufeeXing to resolve this issue. My team specifically is working on the temporal authorizer plugins on the server side of this. Based on the previous conversation, the suggestion is to use temporal interceptors to add the token in temporal headers which are part of the temporal API request. Based on some analysis done from our end when we added custom headers from temporal interceptors configured in client-sdk, these headers come in authorization.CallTarget.Request object in the authorizer plugins request which is of type interface{} as shown in below specs -
image

So for example when temporal API requests were logged out in plugin code we were able to see these test custom headers in the request object. As depicted below "CUSTOM_KEY" is the test header that we set using temporal interceptor -
image

Is this the right way based on previous suggestions to add auth tokens and parse/intercept them in the authorizer plugin (server side)? However, it seems like due to the generic format of authorization.CallTarget.Request as interface{} it is really difficult to parse/deserialize these request objects at runtime. If this is the right way is there an easy/recommended way to parse/deserialize the authorization.CallTarget.Request object to read in whatever custom headers we are setting in each request?

You should be able to cast the request to the proper type based on the APIName.

You may also want to consider implementing this functionality as a proxy in front of the server instead.
Look at this test showing how to intercept and modify request payloads: https://github.com/temporalio/api-go/blob/7f76d854ed02bf60563f779e9e6634cba376746d/proxy/interceptor_test.go#L143.