Application development - Observability
The observability section of the Temporal Application development guide covers the many ways to view the current state of your Temporal Application—that is, ways to view which Workflow Executions are tracked by the Temporal Platform and the state of any specified Workflow Execution, either currently or at points of an execution.
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
If you can't find what you are looking for in the Application development guide, it could be in older docs for SDKs.
This section covers features related to viewing the state of the application, including:
Metrics
Each Temporal SDK is capable of emitting an optional set of metrics from either the Client or the Worker process. For a complete list of metrics capable of being emitted, see the SDK metrics reference.
Metrics can be scraped and stored in time series databases, such as:
Temporal also provides a dashboard you can integrate with graphing services like Grafana. For more information, see:
- Temporal's implementation of the Grafana dashboard
- How to export metrics in Grafana
- Go
- Java
- PHP
- Python
- TypeScript
To emit metrics from the Temporal Client in Go, create a metrics handler from the Client Options and specify a listener address to be used by Prometheus.
client.Options{
MetricsHandler: sdktally.NewMetricsHandler(newPrometheusScope(prometheus.Configuration{
ListenAddress: "0.0.0.0:9090",
TimerType: "histogram",
}
The Go SDK currently supports the Tally library; however, Tally offers extensible custom metrics reporting, which is exposed through the WithCustomMetricsReporter
API.
For more information, see the Go sample for metrics.
To emit metrics with the Java SDK, use MicrometerClientStatsReporter
class to integrate with Micrometer MeterRegistry configured for your metrics backend.
Micrometer is a popular Java framework that provides integration with Prometheus and other backends.
The following example shows how to use MicrometerClientStatsReporter
to define the metrics scope and set it with the WorkflowServiceStubsOptions
.
//...
// see the Micrometer documentation for configuration details on other supported monitoring systems.
// in this example shows how to set up Prometheus registry and stats reported.
PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
StatsReporter reporter = new MicrometerClientStatsReporter(registry);
// set up a new scope, report every 10 seconds
Scope scope = new RootScopeBuilder()
.reporter(reporter)
.reportEvery(com.uber.m3.util.Duration.ofSeconds(10));
// for Prometheus collection, expose a scrape endpoint.
//...
// add metrics scope to WorkflowServiceStub options
WorkflowServiceStubsOptions stubOptions =
WorkflowServiceStubsOptions.newBuilder().setMetricsScope(scope).build();
//...
For more details, see the Java SDK Samples. For details on configuring a Prometheus scrape endpoint with Micrometer, see https://micrometer.io/docs/registry/prometheus#_configuring.
Content is currently unavailable.
Workers can emit metrics and traces. There are a few telemetry options that can be provided to Runtime.install
. The common options are:
metrics: { otel: { url } }
: The URL of a gRPC OpenTelemetry collector.metrics: { prometheus: { bindAddress } }
: Address on the Worker host that will have metrics for Prometheus to scrape.
To set up tracing of Workflows and Activities, use our opentelemetry-interceptors package.
telemetryOptions: {
metrics: {
prometheus: { bindAddress: '0.0.0.0:9464' },
},
logging: { forward: { level: 'DEBUG' } },
},
Tracing
Tracing allows you to view the call graph of a Workflow along with its Activities and any child Workflows.
Temporal Web's tracing capabilities mainly track Activity Execution within a Temporal context. If you need custom tracing specific for your use case, you should make use of context propagation to add tracing logic accordingly.
For information about Workflow tracing, see Tracing Temporal Workflows with DataDog.
For information about how to configure exporters and instrument your code, see Tracing Temporal Services with OTEL.
- Go
- Java
- PHP
- Python
- TypeScript
The Go SDK provides support for distributed tracing through OpenTracing. Tracing allows you to view the call graph of a Workflow along with its Activities and any child Workflows.
Tracing can be configured by providing an opentracing.Tracer implementation in ClientOptions during client instantiation.
For more details on how to configure and leverage tracing, see the OpenTracing documentation.
The OpenTracing support has been validated using Jaeger, but other implementations mentioned here should also work.
Tracing functionality utilizes generic context propagation provided by the Client.
Content is currently unavailable.
Content is currently unavailable.
The interceptors-opentelemetry
sample shows how to use the SDK's built-in OpenTelemetry tracing to trace everything from starting a Workflow to Workflow Execution to running an Activity from that Workflow.
The built-in tracing uses protobuf message headers (like this one when starting a Workflow) to propagate the tracing information from the client to the Workflow and from the Workflow to its successors (when Continued As New), children, and Activities.
All of these executions are linked with a single trace identifier and have the proper parent -> child
span relation.
Tracing is compatible between different Temporal SDKs as long as compatible context propagators are used.
Context propagation
The TypeScript SDK uses the global OpenTelemetry propagator.
To extend the default (Trace Context and Baggage propagators) to also include the Jaeger propagator, follow these steps:
npm i @opentelemetry/propagator-jaeger
At the top level of your Workflow code, add the following lines:
import {propagation} from "@opentelemetry/api";
import {
CompositePropagator,
W3CTraceContextPropagator,
W3CBaggagePropagator,
} from "@opentelemetry/core";
import {JaegerPropagator} from "@opentelemetry/propagator-jaeger";
propagation.setGlobalPropagator(
new CompositePropagator({
propagators: [
new W3CTraceContextPropagator(),
new W3CBaggagePropagator(),
new JaegerPropagator(),
],
})
);
Similarly, you can customize the OpenTelemetry NodeSDK
propagators by following the instructions in the Initialize the SDK section of the README.
Logging
Send logs and errors to a logging service, so that when things go wrong, you can see what happened.
The SDK core uses WARN
for its default logging level.
- Go
- Java
- PHP
- Python
- TypeScript
In Workflow Definitions you can use workflow.GetLogger(ctx)
to write logs.
import (
"context"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
// Workflow is a standard workflow definition.
// Note that the Workflow and Activity don't need to care that
// their inputs/results are being compressed.
func Workflow(ctx workflow.Context, name string) (string, error) {
// ...
workflow.WithActivityOptions(ctx, ao)
// Getting the logger from the context.
logger := workflow.GetLogger(ctx)
// Logging a message with the key value pair `name` and `name`
logger.Info("Compressed Payloads workflow started", "name", name)
info := map[string]string{
"name": name,
}
logger.Info("Compressed Payloads workflow completed.", "result", result)
return result, nil
}
Content is currently unavailable.
Content is currently unavailable.
You can log from a Workflow using Python's standard library, by importing the logging module import logging
.
Set your logging configuration to a level you want to expose logs to.
The following example sets the logging information level to INFO
.
logging.basicConfig(level=logging.INFO)
Then in your Workflow, set your logger
and level on the Workflow. The following example logs the Workflow.
@workflow.defn
class SayHelloWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info(f"Running workflow with parameter {name}")
return await workflow.execute_activity(
your_activity, name, start_to_close_timeout=timedelta(seconds=10)
)
The following is an example output:
INFO:temporalio.workflow:Running workflow with parameter Temporal ({'attempt': 1, 'your-custom-namespace': 'default', 'run_id': 'your-run-id', 'task_queue': 'your-task-queue', 'workflow_id': 'your-workflow-id', 'workflow_type': 'SayHelloWorkflow'})
Logs are skipped during replay by default.
Logging from Workflows is tricky for two reasons:
- Workflows run in a sandboxed environment and cannot do any I/O.
- Workflow code might get replayed at any time, generating duplicate log messages.
To work around these limitations, we recommend using the Sinks feature in the TypeScript SDK. Sinks enable one-way export of logs, metrics, and traces from the Workflow isolate to the Node.js environment.
Sinks are written as objects with methods. Similar to Activities, they are declared in the Worker and then proxied in Workflow code, and it helps to share types between both.
Comparing Sinks, Activities and Interceptors
- Sink functions don't return any value back to the Workflow and cannot not be awaited.
- Sink calls are not recorded in Workflow histories (no timeouts or retries).
- Sink functions are always run on the same Worker that runs the Workflow they are called from.
Declaring the Sink Interface
Explicitly declaring a Sink's interface is optional, but is useful for ensuring type safety in subsequent steps:
packages/test/src/workflows/definitions.ts
- TypeScript
- JavaScript
import { Sinks } from '@temporalio/workflow';
export interface LoggerSinks extends Sinks {
logger: {
info(message: string): void;
};
}
// Not required in JavaScript
Implementing Sinks
Implementing Sinks is a two-step process.
Implement and inject the Sink function into a Worker
- TypeScript
- JavaScript
import { Worker, InjectedSinks } from '@temporalio/worker';
import { LoggerSinks } from './workflows';
async function main() {
const sinks: InjectedSinks<LoggerSinks> = {
logger: {
info: {
fn(workflowInfo, message) {
console.log('workflow: ', workflowInfo.runId, 'message: ', message);
},
callDuringReplay: false, // The default
},
},
};
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'logging-sinks',
sinks,
});
await worker.run();
console.log('Worker gracefully shutdown');
}
main().then(
() => void process.exit(0),
(err) => {
console.error(err);
process.exit(1);
}
);
import { Worker } from '@temporalio/worker';
async function main() {
const sinks = {
logger: {
info: {
fn(workflowInfo, message) {
console.log('workflow: ', workflowInfo.runId, 'message: ', message);
},
callDuringReplay: false, // The default
},
},
};
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'logging-sinks',
sinks,
});
await worker.run();
console.log('Worker gracefully shutdown');
}
main().then(() => void process.exit(0), (err) => {
console.error(err);
process.exit(1);
});
- Sink function implementations are passed as an object into WorkerOptions
- You can specify whether you want the injected function to be called during Workflow replay by setting the
callDuringReplay
boolean option.
Proxy and call a Sink function from a Workflow
packages/test/src/workflows/log-sample.ts
- TypeScript
- JavaScript
import * as wf from '@temporalio/workflow';
import { LoggerSinks } from './definitions';
const { logger } = wf.proxySinks<LoggerSinks>();
export async function logSampleWorkflow(): Promise<void> {
logger.info('Workflow execution started');
}
import * as wf from '@temporalio/workflow';
const { logger } = wf.proxySinks();
export async function logSampleWorkflow() {
logger.info('Workflow execution started');
}
Some important features of the InjectedSinkFunction interface:
- Injected WorkflowInfo argument: The first argument of a Sink function implementation is a
workflowInfo
object that contains useful metadata. - Limited arguments types: The remaining Sink function arguments are copied between the sandbox and the Node.js environment using the structured clone algorithm.
- No return value: To prevent breaking determinism, Sink functions cannot return values to the Workflow.
Advanced: Performance considerations and non-blocking Sinks
The injected sink function contributes to the overall Workflow Task processing duration.
- If you have a long-running sink function, such as one that tries to communicate with external services, you might start seeing Workflow Task timeouts.
- The effect is multiplied when using
callDuringReplay: true
and replaying long Workflow histories because the Workflow Task timer starts when the first history page is delivered to the Worker.
Custom logger
Use a custom logger for logging.
- Go
- Java
- PHP
- Python
- TypeScript
This field sets a custom Logger that is used for all logging actions of the instance of the Temporal Client.
Although the Go SDK does not support most third-party logging solutions natively, our friends at Banzai Cloud built the adapter package logur which makes it possible to use third party loggers with minimal overhead. Most of the popular logging solutions have existing adapters in Logur, but you can find a full list in the Logur Github project.
Here is an example of using Logur to support Logrus:
package main
import (
"go.temporal.io/sdk/client"
"github.com/sirupsen/logrus"
logrusadapter "logur.dev/adapter/logrus"
"logur.dev/logur"
)
func main() {
// ...
logger := logur.LoggerToKV(logrusadapter.New(logrus.New()))
clientOptions := client.Options{
Logger: logger,
}
temporalClient, err := client.Dial(clientOptions)
// ...
}
Content is currently unavailable.
Content is currently unavailable.
Logging in Workers and Clients
The Worker comes with a default logger which defaults to log any messages with level INFO
and higher to STDERR
using console.error
.
The following log levels are listed in increasing order of severity.
Customizing the default logger
Temporal uses a DefaultLogger
that implements the basic interface:
- TypeScript
- JavaScript
import {Runtime, DefaultLogger} from "@temporalio/worker";
const logger = new DefaultLogger("WARN", ({level, message}) => {
console.log(`Custom logger: ${level} — ${message}`);
});
Runtime.install({logger});
import { Runtime, DefaultLogger } from "@temporalio/worker";
const logger = new DefaultLogger("WARN", ({ level, message }) => {
console.log(`Custom logger: ${level} — ${message}`);
});
Runtime.install({ logger });
The previous code example sets the default logger to only log messages with level WARN
and higher.
Accumulate logs for testing and reporting
- TypeScript
- JavaScript
import {DefaultLogger, LogEntry} from "@temporalio/worker";
const logs: LogEntry[] = [];
const logger = new DefaultLogger("TRACE", (entry) => logs.push(entry));
log.debug("hey", {a: 1});
log.info("ho");
log.warn("lets", {a: 1});
log.error("go");
import { DefaultLogger } from "@temporalio/worker";
const logs = [];
const logger = new DefaultLogger("TRACE", (entry) => logs.push(entry));
log.debug("hey", { a: 1 });
log.info("ho");
log.warn("lets", { a: 1 });
log.error("go");
A common logging use case is logging to a file to be picked up by a collector like the Datadog Agent.
- TypeScript
- JavaScript
import {Runtime} from "@temporalio/worker";
import winston from "winston";
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [new transports.File({filename: "/path/to/worker.log"})],
});
Runtime.install({logger});
import { Runtime } from "@temporalio/worker";
import winston from "winston";
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [new transports.File({ filename: "/path/to/worker.log" })],
});
Runtime.install({ logger });
Visibility
The term Visibility, within the Temporal Platform, refers to the subsystems and APIs that enable an operator to view Workflow Executions that currently exist within a Cluster.
Search Attributes
The typical method of retrieving a Workflow Execution is by its Workflow Id.
However, sometimes you'll want to retrieve one or more Workflow Executions based on another property. For example, imagine you want to get all Workflow Executions of a certain type that have failed within a time range, so that you can start new ones with the same arguments.
You can do this with Search Attributes.
- Default Search Attributes like
WorkflowType
,StartTime
andExecutionStatus
are automatically added to Workflow Executions. - Custom Search Attributes can contain their own domain-specific data (like
customerId
ornumItems
).- A few generic Custom Search Attributes like
CustomKeywordField
andCustomIntField
are created by default in Temporal's Docker Compose.
- A few generic Custom Search Attributes like
The steps to using custom Search Attributes are:
- Create a new Search Attribute in your Cluster using
tctl
or the Cloud UI. - Set the value of the Search Attribute for a Workflow Execution:
- On the Client by including it as an option when starting the Execution.
- In the Workflow by calling
UpsertSearchAttributes
.
- Read the value of the Search Attribute:
- On the Client by calling
DescribeWorkflow
. - In the Workflow by looking at
WorkflowInfo
.
- On the Client by calling
- Query Workflow Executions by the Search Attribute using a List Filter:
- In
tctl
. - In code by calling
ListWorkflowExecutions
.
- In
Here is how to query Workflow Executions:
- Go
- Java
- PHP
- Python
- TypeScript
Use Client.ListWorkflow
.
Content is currently unavailable.
Content is currently unavailable.
Use WorkflowService.listWorkflowExecutions
:
import {Connection} from "@temporalio/client";
const connection = await Connection.connect();
const response = await connection.workflowService.listWorkflowExecutions({
query: `ExecutionStatus = "Running"`,
});
where query
is a List Filter.
Custom Search Attributes
After you've created custom Search Attributes in your Cluster (using tctl
or the Cloud UI), you can set the values of the custom Search Attributes when starting a Workflow.
- Go
- Java
- PHP
- Python
- TypeScript
Provide key-value pairs in StartWorkflowOptions.SearchAttributes
.
Search Attributes are represented as map[string]interface{}
.
The values in the map must correspond to the Search Attribute's value type:
- Bool =
bool
- Datetime =
time.Time
- Double =
float64
- Int =
int64
- Keyword =
string
- Text =
string
If you had custom Search Attributes CustomerId
of type Keyword and MiscData
of type Text, you would provide string
values:
func (c *Client) CallYourWorkflow(ctx context.Context, workflowID string, payload map[string]interface{}) error {
// ...
searchAttributes := map[string]interface{}{
"CustomerId": payload["customer"],
"MiscData": payload["miscData"]
}
options := client.StartWorkflowOptions{
SearchAttributes: searchAttributes
// ...
}
we, err := c.Client.ExecuteWorkflow(ctx, options, app.YourWorkflow, payload)
// ...
}
Content is currently unavailable.
Content is currently unavailable.
To set custom Search Attributes, use the search_attributes
parameter of the 'start_workflow()' method.
handle = await client.start_workflow(
"your-workflow-name",
id="your-workflow-id",
task_queue="your-task-queue",
search_attributes={"Your-Custom-Keyword-Field": ["value"]},
)
Use WorkflowOptions.searchAttributes
.
search-attributes/src/client.ts
- TypeScript
- JavaScript
const handle = await client.start(example, {
taskQueue: 'search-attributes',
workflowId: 'search-attributes-example-0',
searchAttributes: {
CustomIntField: [2],
CustomKeywordField: ['keywordA', 'keywordB'],
CustomBoolField: [true],
CustomDatetimeField: [new Date()],
CustomStringField: [
'String field is for text. When queried, it will be tokenized for partial match. StringTypeField cannot be used in Order By',
],
},
});
const { searchAttributes } = await handle.describe();
const handle = await client.start(example, {
taskQueue: 'search-attributes',
workflowId: 'search-attributes-example-0',
searchAttributes: {
CustomIntField: [2],
CustomKeywordField: ['keywordA', 'keywordB'],
CustomBoolField: [true],
CustomDatetimeField: [new Date()],
CustomStringField: [
'String field is for text. When queried, it will be tokenized for partial match. StringTypeField cannot be used in Order By',
],
},
});
const { searchAttributes } = await handle.describe();
The type of searchAttributes
is Record<string, string[] | number[] | boolean[] | Date[]>
.
Upsert Search Attributes
You can upsert Search Attributes to add or update Search Attributes from within Workflow code.
- Go
- Java
- PHP
- Python
- TypeScript
In advanced cases, you may want to dynamically update these attributes as the Workflow progresses. UpsertSearchAttributes is used to add or update Search Attributes from within Workflow code.
UpsertSearchAttributes
will merge attributes to the existing map in the Workflow.
Consider this example Workflow code:
func YourWorkflow(ctx workflow.Context, input string) error {
attr1 := map[string]interface{}{
"CustomIntField": 1,
"CustomBoolField": true,
}
workflow.UpsertSearchAttributes(ctx, attr1)
attr2 := map[string]interface{}{
"CustomIntField": 2,
"CustomKeywordField": "seattle",
}
workflow.UpsertSearchAttributes(ctx, attr2)
}
After the second call to UpsertSearchAttributes
, the map will contain:
map[string]interface{}{
"CustomIntField": 2, // last update wins
"CustomBoolField": true,
"CustomKeywordField": "seattle",
}
Content is currently unavailable.
Content is currently unavailable.
To upsert custom Search Attributes, use the upsert_search_attributes()
function and set it to an empty list.
The keys are added to or replace the existing Search Attributes, similar to dict.update()
.
workflow.upsert_search_attributes({"Your-Custom-Keyword-Field": ["new-value"]})
Inside a Workflow, we can read from WorkflowInfo.searchAttributes
and call upsertSearchAttributes
:
search-attributes/src/workflows.ts
- TypeScript
- JavaScript
export async function example(): Promise<SearchAttributes> {
const customInt = (workflowInfo().searchAttributes.CustomIntField?.[0] as number) || 0;
upsertSearchAttributes({
// overwrite the existing CustomIntField: [2]
CustomIntField: [customInt + 1],
// delete the existing CustomBoolField: [true]
CustomBoolField: [],
// add a new value
CustomDoubleField: [3.14],
});
return workflowInfo().searchAttributes;
}
export async function example() {
const customInt = workflowInfo().searchAttributes.CustomIntField?.[0] || 0;
upsertSearchAttributes({
// overwrite the existing CustomIntField: [2]
CustomIntField: [customInt + 1],
// delete the existing CustomBoolField: [true]
CustomBoolField: [],
// add a new value
CustomDoubleField: [3.14],
});
return workflowInfo().searchAttributes;
}
Remove Search Attribute
To remove a Search Attribute that was previously set, set it to an empty array: []
.
- Go
- Java
- PHP
- Python
- TypeScript
There is no support for removing a field.
However, to achieve a similar effect, set the field to some placeholder value.
For example, you could set CustomKeywordField
to impossibleVal
.
Then searching CustomKeywordField != 'impossibleVal'
will match Workflows with CustomKeywordField
not equal to impossibleVal
, which includes Workflows without the CustomKeywordField
set.
Content is currently unavailable.
To remove a Search Attribute that was previously set, set it to an empty array []
.
To remove a Search Attribute, use the upsert_search_attributes()
function with an empty list as its value.
workflow.upsert_search_attributes({"Your-Custom-Keyword-Field": []})
import {upsertSearchAttributes} from "@temporalio/workflow";
async function yourWorkflow() {
upsertSearchAttributes({CustomIntField: [1, 2, 3]});
// ... later, to remove:
upsertSearchAttributes({CustomIntField: []});
}