Skip to main content

Workers and Task Queues in TypeScript

@temporalio/worker NPM API reference | GitHub

Background reading: Workers in Temporal

What is a Worker?

A Worker is a process that connects to the Temporal Server, polls Task Queues for Tasks sent from Clients, and executes Workflows and Activities in response.

  • Workers host Workflows and Activities.
    • TypeScript SDK Workers bundle Workflows based on workflowsPath with Webpack and run them inside v8 isolates.
    • TypeScript SDK Workers directly run activities inside the normal Node.js environment.
  • Workers are extremely scalable.
    • Workers connect to the Temporal Server, poll their configured Task Queue for Tasks, execute chunks of code in response to those Tasks, and then communicate the results back.
    • Workers are distinct from Clients and scaled independently of Temporal Server, which has its own internal services to scale.
    • Workers are stateless, and can be brought up and down at any time with no Temporal data loss impact. To migrate to new versions of your Workflows and Activities, you restart your Workers with the new versions (and optionally use the patch API to migrate still-running Workflows of the older version).
    • Use the @temporalio/worker package's Worker class to create and run as many Workers as your use case demands, across any number of hosts.
  • Workers are run on user-controlled hosts. This is an important security feature which means Temporal Server (or Temporal Cloud) never executes your Workflow or Activity code, and that Workers can have different hardware (e.g. custom GPUs for Machine Learning) than the rest of the system.
Your Workflows will only progress if there are Workers polling the right Task Queues, and they must have the right Workflows and Activities registered to execute those Tasks.

The TypeScript SDK uses TypeScript, but cannot completely protect you from typos. If you are experiencing issues, you can check the status of Workers and the Task Queues they poll with tctl or the Temporal Web UI.

Temporal Web Task Queues view

How to develop a Worker

Create a Worker with Worker.create() (which establishes the initial gRPC connection), then call worker.run() on it (to start polling the Task Queue).

Below is an example of starting a Worker that polls the Task Queue named tutorial.

import { Worker } from '@temporalio/worker';
import * as activities from './activities';

async function run() {
// Step 1: Register Workflows and Activities with the Worker and connect to
// the Temporal server.
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'hello-world',
});
// Worker connects to localhost by default and uses console.error for logging.
// Customize the Worker by passing more options to create():
// https://typescript.temporal.io/api/classes/worker.Worker
// If you need to configure server connection parameters, see docs:
// https://docs.temporal.io/typescript/security#encryption-in-transit-with-mtls

// Step 2: Start accepting tasks on the `hello-world` queue
await worker.run();
}

run().catch((err) => {
console.error(err);
process.exit(1);
});

taskQueue is the only required option, but you will also use workflowsPath and activities to register Workflows and Activities with the Worker.

A full example for Workers looks like this:

import { Worker, NativeConnection } from '@temporalio/worker';
import * as activities from './activities';

async function run() {
const connection = await NativeConnection.connect({
// defaults port to 7233 if not specified
address: 'foo.bar.tmprl.cloud',
tls: {
// set to true if TLS without mTLS
// See docs for other TLS options
clientCertPair: {
crt: clientCert,
key: clientKey,
},
},
});

const worker = await Worker.create({
connection,
namespace: 'foo.bar', // as explained in Namespaces section
// ...
});
await worker.run();
}

run().catch((err) => {
console.error(err);
process.exit(1);
});

See below for more Worker options.

Workflow and Activity registration

Workers bundle Workflow code and node_modules using Webpack v5 and execute them inside V8 isolates. Activities are directly required and run by Workers in the Node.js environment.

Workers are very flexible – you can host any or all of your Workflows and Activities on a Worker, and you can host multiple Workers in a single machine.

There are three main things the Worker needs:

  • taskQueue: the Task Queue to poll. This is the only required argument.
  • activities: Optional. Imported and supplied directly to the Worker. Not the path.
  • Workflow bundle:
  • Either specify a workflowsPath to your workflows.ts file to pass to Webpack, e.g., require.resolve('./workflows'). Workflows will be bundled with their dependencies.
  • Or pass a prebuilt bundle to workflowBundle instead if you prefer to handle the bundling yourself.

Additional Worker Options

This is a selected subset of options you are likely to use. Even more advanced options, particularly for performance tuning, are available in the API reference.

OptionsDescription
dataConverterEncodes and decodes data entering and exiting a Temporal Server. Supports undefined, UintBArray, and JSON.
sinksAllows injection of Workflow Sinks (Advanced feature: see Logging docs)
interceptorsA mapping of interceptor type to a list of factories or module paths (Advanced feature: see Interceptors)

Operation guides:

The Worker package embeds the Temporal Rust Core SDK. It comes pre-compiled for most installations.

We've provided pre-compiled binaries for:

  • Mac with an Intel chip: x86_64-apple-darwin
  • Mac with an Apple chip: aarch64-apple-darwin
  • Linux with x86_64 architecture: x86_64-unknown-linux-gnu
  • Linux with aarch64 architecture: aarch64-unknown-linux-gnu
  • Windows with x86_64 architecture: x86_64-pc-windows-gnu (Windows is not yet supported but it is a priority for us).

If you need to compile the Worker yourself, set up the Rust toolchain by following the instructions here.

Prebuilt Workflow Bundles

Advanced users can pass a prebuilt bundle instead of workflowsPath, or you can use Temporal's bundleWorkflowCode helper:

import { bundleWorkflowCode, Worker } from '@temporalio/worker';

// Option 1: passing path to prebuilt bundle
const worker = await Worker.create({
taskQueue,
workflowBundle: {
codePath: './path-to-bundle.js',
sourceMapPath: './path-to-bundle.js.map',
},
});

// Option 2: bundling code using Temporal's bundler settings
const workflowBundle = await bundleWorkflowCode({
workflowsPath: require.resolve('./path-to-your-workflows'),
});
const worker = await Worker.create({
taskQueue,
workflowBundle,
});

How to shut down a Worker and track its state

Workers shut down if they receive any of these shutdownSignals: ['SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR2']. In development, we shut down Workers with Ctrl-C (SIGINT) or nodemon (SIGUSR2). In production, we usually want to give Workers a shutdownGraceTime long enough for them to finish any in-progress Activities. As soon as they receive a shutdown signal or request, the Worker stops polling for new Tasks and allows in-flight Tasks to complete until shutdownGraceTime is reached. Any Activities that are still running at that time will stop running, and will be rescheduled by Temporal Server when an Activity timeout occurs.

We may want to programmatically shut down Workers (with worker.shutdown()) in integration tests or when automating a fleet of Workers.

Worker states

At any point in time, we can Query Worker state with worker.getState(). A Worker is in one of 7 states at any given point:

  • INITIALIZED - The initial state of the Worker after calling Worker.create and successful connection to the server
  • RUNNING - worker.run() was called, polling Task Queues
  • FAILED - Worker encountered an unrecoverable error, worker.run() should reject with the error
  • The last 4 states are related to the Worker shutdown process:
    • STOPPING - worker.shutdown() was called or received shutdown Signal, Worker will forcefully shutdown after shutdownGraceTime
    • DRAINING - Core has indicated that shutdown is complete and all Workflow tasks have been drained, waiting for activities and cached Workflows eviction
    • DRAINED - All activities and Workflows have completed, ready to shutdown
    • STOPPED - Shutdown complete, worker.run() resolves

If you need even more visibility into internal Worker state, see the API reference for more.

Rust Core and Worker Networking

In development, the TypeScript SDK usually handles all of the communication between the Worker and the Temporal Server behind the scenes.

In production settings, you can configure the address and namespace the Worker speaks to via the Rust Core SDK NativeConnection, and configure the Core Runtime with RuntimeOptions:

import {
Worker,
DefaultLogger,
Runtime,
NativeConnection,
} from '@temporalio/worker';

const logger = new DefaultLogger('DEBUG');
Runtime.install({
logger,
telemetryOptions: { logForwardingLevel: 'INFO' },
});
const connection = await NativeConnection.connect({
address: 'temporal.myorg.io',
});
const worker = await Worker.create({
connection,
namespace: 'your-custom-namespace',
/* standard Worker options from here */
});

Temporal also supports mTLS encryption (required by Temporal Cloud) this way - please read our Security docs for more information.

Task Queues

A Task Queue is a dynamic queue in Temporal Server polled by one or more Workers.

A Task Queue is a lightweight, dynamically allocated queue that one or more Worker Entities poll for Tasks.

Task Queues do not have any ordering guarantees. It is possible to have a Task that stays in a Task Queue for a period of time, if there is a backlog that wasn't drained for that time.

There are two types of Task Queues, Activity Task Queues and Workflow Task Queues.

Task Queue component

Task Queues are very lightweight components. Task Queues do not require explicit registration but instead are created on demand when a Workflow Execution or Activity spawns or when a Worker Process subscribes to it. When a Task Queue is created, both a Workflow Task Queue and an Activity Task Queue are created under the same name. There is no limit to the number of Task Queues a Temporal Application can use or a Temporal Cluster can maintain.

Workers poll for Tasks in Task Queues via synchronous RPC. This implementation offers several benefits:

  • A Worker Process polls for a message only when it has spare capacity, avoiding overloading itself.
  • In effect, Task Queues enable load balancing across many Worker Processes.
  • Task Queues enable what we call Task Routing, which is the routing of specific Tasks to specific Worker Processes or even a specific process.
  • Task Queues support server-side throttling, which enables you to limit the Task dispatching rate to the pool of Worker Processes while still supporting Task dispatching at higher rates when spikes happen.
  • When all Worker Processes are down, messages simply persist in a Task Queue, waiting for the Worker Processes to recover.
  • Worker Processes do not need to advertise themselves through DNS or any other network discovery mechanism.
  • Worker Processes do not need to have any open ports, which is more secure.

All Workers listening to a given Task Queue must have identical registrations of Activities and/or Workflows. The one exception is during a Server upgrade, where it is okay to have registration temporarily misaligned while the binary rolls out.

Where to set Task Queues

There are four places where the name of the Task Queue can be set by the developer.

  1. A Task Queue must be set when spawning a Workflow Execution:
  1. A Task Queue name must be set when creating a Worker Entity and when running a Worker Process:

Note that all Worker Entities listening to the same Task Queue name must be registered to handle the exact same Workflows Types and Activity Types.

If a Worker Entity polls a Task for a Workflow Type or Activity Type it does not know about, it will fail that Task. However, the failure of the Task will not cause the associated Workflow Execution to fail.

  1. A Task Queue name can be provided when spawning an Activity Execution:

This is optional. An Activity Execution inherits the Task Queue name from its Workflow Execution if one is not provided.

  1. A Task Queue name can be provided when spawning a Child Workflow Execution:

This is optional. A Child Workflow Execution inherits the Task Queue name from its Parent Workflow Execution if one is not provided.

Where Task Queues are used

In Temporal, a Task Queue is represented in code by its name as a string.

There are two main places where the name of the Task Queue is supplied by the developer.

When scheduling a Workflow, a taskQueue must be specified.

import { Connection, WorkflowClient } from '@temporalio/client';
const connection = await Connection.connect();
const client = new WorkflowClient({ connection });
const result = await client.execute(yourWorkflow, {
// required
taskQueue: 'testhttp',
workflowId: 'business-meaningful-id',
});

When creating a Worker, you must pass the taskQueue option to the Worker.create() function.

const worker = await Worker.create({
// imported elsewhere
activities,
taskQueue: 'your-task-queue',
});

Optionally, in Workflow code, when calling an Activity, you can specify the Task Queue by passing the taskQueue option to proxyActivities() or startChild/executeChild. If you do not specify a taskQueue, then the TypeScript SDK places Activity and Child Workflow Tasks in the same Task Queue as the Workflow Task Queue.

Example: Sticky Activities

Any Worker that polls a Task Queue is allowed to pick up the next task; sometimes this is undesirable because you want tasks to execute sequentially on the same machine.

Fortunately, there is a design pattern for this we call Sticky Activities. Because Task Queues are dynamically created and very lightweight, you can use them for task routing by creating a new Task Queue per machine.

The main strategy is:

  1. Create a getUniqueTaskQueue Activity that generates a unique Task Queue name, (for example, uniqueWorkerTaskQueue). It doesn't matter where this Activity is run so this can be "non sticky" as per Temporal default behavior
  2. For Activities intended to be sticky, register them in one Worker, and have that be the only Worker listening on that uniqueWorkerTaskQueue.
    • Multiple Workers can be created inside the same process.
  3. Execute Workflows from the Client like normal.
    • Activities will execute in sequence on the same machine because they are all routed by the uniqueWorkerTaskQueue.

Workflow Code:

activities-sticky-queues/src/workflows.ts

const { getUniqueTaskQueue } = proxyActivities<ReturnType<typeof createNonStickyActivities>>({
startToCloseTimeout: '1 minute',
});

export async function fileProcessingWorkflow(maxAttempts = 5): Promise<void> {
for (let attempt = 1; attempt <= maxAttempts; ++attempt) {
try {
const uniqueWorkerTaskQueue = await getUniqueTaskQueue();
const activities = proxyActivities<ReturnType<typeof createStickyActivities>>({
taskQueue: uniqueWorkerTaskQueue,
// Note the use of scheduleToCloseTimeout.
// The reason this timeout type is used is because this task queue is unique
// to a single worker. When that worker goes away, there won't be a way for these
// activities to progress.
scheduleToCloseTimeout: '1 minute',
});

const downloadPath = `/tmp/${uuid4()}`;
await activities.downloadFileToWorkerFileSystem('https://temporal.io', downloadPath);
try {
await activities.workOnFileInWorkerFileSystem(downloadPath);
} finally {
await activities.cleanupFileFromWorkerFileSystem(downloadPath);
}
return;
} catch (err) {
if (attempt === maxAttempts) {
console.log(`Final attempt (${attempt}) failed, giving up`);
throw err;
}

console.log(`Attempt ${attempt} failed, retrying on a new Worker`);
}
}
}

Worker Code:

activities-sticky-queues/src/worker.ts

async function run() {
const uniqueWorkerTaskQueue = uuid();

const workers = await Promise.all([
Worker.create({
workflowsPath: require.resolve('./workflows'),
activities: createNonStickyActivities(uniqueWorkerTaskQueue),
taskQueue: 'sticky-activity-tutorial',
}),
Worker.create({
// No workflows for this queue
activities: createStickyActivities(),
taskQueue: uniqueWorkerTaskQueue,
}),
]);
await Promise.all(workers.map((w) => w.run()));
}

This pattern is in use at Netflix. Note that this is unrelated to Sticky Queues, which are an internal implementation detail.