Skip to content

Worker Module

ci go report codecov Deps PkgGoDev

Overview

Yokai provides a fxworker module, providing a workers pool to your application.

It wraps the worker module, based on sync.

It comes with:

  • automatic panic recovery
  • automatic logging
  • automatic metrics
  • possibility to defer workers
  • possibility to limit workers max execution attempts

Installation

First install the module:

go get github.com/ankorstore/yokai/fxworker

Then activate it in your application bootstrapper:

internal/bootstrap.go
package internal

import (
    "github.com/ankorstore/yokai/fxcore"
    "github.com/ankorstore/yokai/fxworker"
)

var Bootstrapper = fxcore.NewBootstrapper().WithOptions(
    // load fxworker module
    fxworker.FxWorkerModule,
    // ...
)

Configuration

configs/config.yaml
modules:
  worker:
    defer: 0.1             # threshold in seconds to wait before starting all workers, immediate start by default
    attempts: 3            # max execution attempts in case of failures for all workers, no restart by default
    metrics:
      collect:
        enabled: true      # to collect metrics about workers executions
        namespace: foo     # workers metrics namespace (empty by default)
        subsystem: bar     # workers metrics subsystem (empty by default)

Usage

This module provides the possibility to register several Worker implementations, with an optional list of WorkerExecutionOption.

They will be collected and given by Yokai to the WorkerPool in its dependency injection system.

Workers creation

You can create your workers by implementing the Worker interface.

For example:

internal/worker/example.go
package worker

import (
    "context"
    "time"

    "github.com/ankorstore/yokai/config"
    "github.com/ankorstore/yokai/worker"
)

type ExampleWorker struct {
    config *config.Config
}

func NewExampleWorker(config *config.Config) *ExampleWorker {
    return &ExampleWorker{
        config: config,
    }
}

func (w *ExampleWorker) Name() string {
    return "example-worker"
}

func (w *ExampleWorker) Run(ctx context.Context) error {
    logger := worker.CtxLogger(ctx)

    for {
        select {
        case <-ctx.Done():
            logger.Info().Msg("stopping")

            return nil
        default:
            logger.Info().Msg("running")

            // The sleep interval can be configured in the application config files.
            time.Sleep(time.Duration(w.config.GetFloat64("config.example-worker.interval")) * time.Second)
        }
    }
}

And the corresponding example configuration:

configs/config.yaml
config:
  example-worker:
    interval: 3

Workers registration

You can register your workers with the AsWorker() function:

internal/register.go
package internal

import (
    "github.com/ankorstore/yokai/fxworker"
    "github.com/ankorstore/yokai/worker"
    w "github.com/foo/bar/worker"
    "go.uber.org/fx"
)

func Register() fx.Option {
    return fx.Options(
        fxworker.AsWorker(
            w.NewExampleWorker,                   // register the ExampleWorker
            worker.WithDeferredStartThreshold(1), // with a deferred start of 1 second
            worker.WithMaxExecutionsAttempts(2),  // and 2 max execution attempts 
        ),
        // ...
    )
}

Workers execution

Yokai will automatically start the WorkerPool containing the registered workers.

You can get, in real time, the status of your workers executions on the core dashboard:

Middlewares

This module provides middleware support for workers, allowing you to add behaviors without modifying the worker's core implementation.

Middlewares wrap a worker's Run method and can perform actions before and after the worker execution, or even modify the execution flow.

Implementing Middlewares

To create a middleware, you need to implement the worker.Middleware interface:

internal/worker/middleware/timeout.go
package middleware

import (
    "context"
    "errors"
    "time"

    "github.com/ankorstore/yokai/config"
    "github.com/ankorstore/yokai/worker"
)

// TimeoutMiddleware implements the worker.Middleware interface
type TimeoutMiddleware struct {
    timeout time.Duration
}

// NewTimeoutMiddleware creates a new TimeoutMiddleware with timeout from config
func NewTimeoutMiddleware(cfg *config.Config) *TimeoutMiddleware {
    timeout := cfg.GetDuration("modules.worker.middleware.timeout")

    return &TimeoutMiddleware{
        timeout: timeout,
    }
}

// Name returns the middleware name
func (m *TimeoutMiddleware) Name() string {
    return "timeout-middleware"
}

// Handle returns the middleware function
func (m *TimeoutMiddleware) Handle() worker.MiddlewareFunc {
    return func(next worker.HandlerFunc) worker.HandlerFunc {
        return func(ctx context.Context) error {
            // Create a timeout context
            timeoutCtx, cancel := context.WithTimeout(ctx, m.timeout)
            defer cancel()

            // Create a channel to receive the result of the worker execution
            done := make(chan error)

            // Execute the worker in a goroutine
            go func() {
                done <- next(timeoutCtx)
            }()

            // Wait for either the worker to complete or the timeout to occur
            select {
            case err := <-done:
                return err
            case <-timeoutCtx.Done():
                if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) {
                    return errors.New("worker execution timed out")
                }

                return timeoutCtx.Err()
            }
        }
    }
}

Using Middlewares with AsWorker()

You can register your middlewares along with your workers using the AsWorker() function:

internal/register.go
package internal

import (
    "github.com/ankorstore/yokai/fxworker"
    "github.com/ankorstore/yokai/worker"
    w "github.com/foo/bar/worker"
    m "github.com/foo/bar/worker/middleware"
    "go.uber.org/fx"
)

func Register() fx.Option {
    return fx.Options(
        fxworker.AsWorker(
            w.NewExampleWorker,                   // register the ExampleWorker
            m.NewTimeoutMiddleware,               // register the middleware
            worker.WithDeferredStartThreshold(1), // with a deferred start of 1 second
        ),
        // ...
    )
}

Health Check

This module provides a ready to use WorkerProbe, to be used by the health check module.

It will ensure that the worker pool executions are all in healthy status.

You just need to register it:

internal/register.go
package internal

import (
    "github.com/ankorstore/yokai/fxhealthcheck"
    "github.com/ankorstore/yokai/worker/healthcheck"
    "go.uber.org/fx"
)

func Register() fx.Option {
    return fx.Options(
        // register the WorkerProbe probe for startup, liveness and readiness checks
        fxhealthcheck.AsCheckerProbe(healthcheck.NewWorkerProbe),
        // ...
    )
}

Logging

To get logs correlation in your workers, you need to retrieve the logger from the context with log.CtxLogger():

log.CtxLogger(ctx).Info().Msg("example message")

You can also use the shortcut function worker.CtxLogger():

worker.CtxLogger(ctx)

As a result, log records will have the worker name and workerExecutionID fields added automatically:

INF example message module=worker service=app worker=example-worker workerExecutionID=b57be88f-163f-4a81-bf24-a389c93d804b

The workers logging will be based on the log module configuration.

Tracing

To get traces correlation in your workers, you need to retrieve the tracer provider from the context with trace.CtxTracerProvider():

ctx, span := trace.CtxTracerProvider(ctx).Tracer("example tracer").Start(ctx, "example span")
defer span.End()

You can also use the shortcut function worker.CtxTracer():

ctx, span := worker.CtxTracer(ctx).Start(ctx, "example span")
defer span.End()

As a result, in your application trace spans attributes:

service.name: app
Worker: example-worker
WorkerExecutionID: b57be88f-163f-4a81-bf24-a389c93d804b
...

The workers tracing will be based on the trace module configuration.

Metrics

You can enable workers executions automatic metrics with modules.worker.metrics.collect.enable=true:

configs/config.yaml
modules:
  worker:
    metrics:
      collect:
        enabled: true      # to collect metrics about workers executions
        namespace: foo     # workers metrics namespace (empty by default)
        subsystem: bar     # workers metrics subsystem (empty by default)

This will collect metrics about:

  • workers start and restart
  • workers successes
  • workers failures

For example, after starting Yokai's workers pool, the core HTTP server will expose in the configured metrics endpoint:

[GET] /metrics
# ...
# HELP worker_executions_total Total number of workers executions
# TYPE worker_executions_total counter
worker_executions_total{status="started",worker="example-worker"} 1