Skip to main content

Go SDK Selectors

Overview

In Go, the select statement lets a goroutine wait on multiple communication operations. A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.

However, a normal Go select statement can not be used inside of Workflows directly because of the random nature. Temporal's Go SDK Selectors are similar and act as a replacement. They can block on sending and receiving from Channels but as a bonus can listen on Future deferred work. Usage of Selectors to defer and process work (in place of Go's select) are necessary in order to ensure deterministic Workflow code execution (though using select in Activity code is fine).

Full API Example

The API is sufficiently different from select that it bears documenting:

func SampleWorkflow(ctx workflow.Context) error {
// standard Workflow setup code omitted...

// API Example: declare a new selector
selector := workflow.NewSelector(ctx)

// API Example: defer code execution until the Future that represents Activity result is ready
work := workflow.ExecuteActivity(ctx, ExampleActivity)
selector.AddFuture(work, func(f workflow.Future) {
// deferred code omitted...
})

// more parallel timers and activities initiated...

// API Example: receive information from a Channel
var signalVal string
channel := workflow.GetSignalChannel(ctx, channelName)
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
// matching on the channel doesn't consume the message.
// So it has to be explicitly consumed here
c.Receive(ctx, &signalVal)
// do something with received information
})

// API Example: block until the next Future is ready to run
// important! none of the deferred code runs until you call selector.Select
selector.Select(ctx)

// Todo: document selector.HasPending
}

Using Selectors with Futures

You usually add Futures after Activities:

    // API Example: defer code execution until after an activity is done
work := workflow.ExecuteActivity(ctx, ExampleActivity)
selector.AddFuture(work, func(f workflow.Future) {
// deferred code omitted...
})

selector.Select(ctx) is the primary mechanism which blocks on and executes Future work. It is intentionally flexible; you may call it conditionally or multiple times:

    // API Example: blocking conditionally
if somecondition != nil {
selector.Select(ctx)
}

// API Example: popping off all remaining Futures
for i := 0; i < len(someArray); i++ {
selector.Select(ctx) // this will wait for one branch
// you can interrupt execution here
}

A Future matches only once per Selector instance even if Select is called multiple times. If multiple items are available, the order of matching is not defined.

Using Selectors with Timers

An important use case of futures is setting up a race between a timer and a pending activity, effectively adding a "soft" timeout that doesn't result in any errors or retries of that activity.

For example, the Timer sample shows how you can write a long running order processing operation where:

  • if processing takes too long, we send out a notification email to user about the delay, but we won't cancel the operation
  • if the operation finishes before the timer fires, then we want to cancel the timer.
var processingDone bool
f := workflow.ExecuteActivity(ctx, OrderProcessingActivity)
selector.AddFuture(f, func(f workflow.Future) {
processingDone = true
// cancel timerFuture
cancelHandler()
})

// use timer future to send notification email if processing takes too long
timerFuture := workflow.NewTimer(childCtx, processingTimeThreshold)
selector.AddFuture(timerFuture, func(f workflow.Future) {
if !processingDone {
// processing is not done yet when timer fires, send notification email
_ = workflow.ExecuteActivity(ctx, SendEmailActivity).Get(ctx, nil)
}
})

// wait the timer or the order processing to finish
selector.Select(ctx)

We create timers with the workflow.NewTimer API.

Using Selectors with Channels

selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {}) is the primary mechanism which receives messages from Channels.

    // API Example: receive information from a Channel
var signalVal string
channel := workflow.GetSignalChannel(ctx, channelName)
selector.AddReceive(channel, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &signalVal)
// do something with received information
})

Merely matching on the channel doesn't consume the message; it has to be explicitly consumed with a c.Receive(ctx, &signalVal) call.

Querying Selector State

You can use the selector.HasPending API to ensure that signals are not lost when a Workflow is closed (e.g. by ContinueAsNew).

Learn More

Usage of Selectors is best learned by example:

  • Setting up a race condition between an Activity and a Timer, and conditionally execute (Timer example)
  • Receiving information in a Channel (Mutex example)
  • Looping through a list of work and scheduling them all in parallel (DSL example)
  • Executing activities in parallel, pick the first result, cancel remainder (Pick First example)