Worker Module
Overview
Yokai provides a fxworker module, providing a workers pool to your application.
It wraps the worker module, based on sync.
It comes with:
- automatic panic recovery
- automatic logging
- automatic metrics
- possibility to defer workers
- possibility to limit workers max execution attempts
Installation
First install the module:
Then activate it in your application bootstrapper:
package internal
import (
"github.com/ankorstore/yokai/fxcore"
"github.com/ankorstore/yokai/fxworker"
)
var Bootstrapper = fxcore.NewBootstrapper().WithOptions(
// load fxworker module
fxworker.FxWorkerModule,
// ...
)
Configuration
modules:
worker:
defer: 0.1 # threshold in seconds to wait before starting all workers, immediate start by default
attempts: 3 # max execution attempts in case of failures for all workers, no restart by default
metrics:
collect:
enabled: true # to collect metrics about workers executions
namespace: foo # workers metrics namespace (empty by default)
subsystem: bar # workers metrics subsystem (empty by default)
Usage
This module provides the possibility to register several Worker implementations, with an optional list of WorkerExecutionOption.
They will be collected and given by Yokai to the WorkerPool in its dependency injection system.
Workers creation
You can create your workers by implementing the Worker interface.
For example:
package worker
import (
"context"
"time"
"github.com/ankorstore/yokai/config"
"github.com/ankorstore/yokai/worker"
)
type ExampleWorker struct {
config *config.Config
}
func NewExampleWorker(config *config.Config) *ExampleWorker {
return &ExampleWorker{
config: config,
}
}
func (w *ExampleWorker) Name() string {
return "example-worker"
}
func (w *ExampleWorker) Run(ctx context.Context) error {
logger := worker.CtxLogger(ctx)
for {
select {
case <-ctx.Done():
logger.Info().Msg("stopping")
return nil
default:
logger.Info().Msg("running")
// The sleep interval can be configured in the application config files.
time.Sleep(time.Duration(w.config.GetFloat64("config.example-worker.interval")) * time.Second)
}
}
}
And the corresponding example configuration:
Workers registration
You can register your workers with the AsWorker()
function:
package internal
import (
"github.com/ankorstore/yokai/fxworker"
"github.com/ankorstore/yokai/worker"
w "github.com/foo/bar/worker"
"go.uber.org/fx"
)
func Register() fx.Option {
return fx.Options(
fxworker.AsWorker(
w.NewExampleWorker, // register the ExampleWorker
worker.WithDeferredStartThreshold(1), // with a deferred start of 1 second
worker.WithMaxExecutionsAttempts(2), // and 2 max execution attempts
),
// ...
)
}
Workers execution
Yokai will automatically start the WorkerPool containing the registered workers.
You can get, in real time, the status of your workers executions on the core dashboard:
Health Check
This module provides a ready to use WorkerProbe, to be used by the health check module.
It will ensure that the worker pool executions are all in healthy status.
You just need to register it:
package internal
import (
"github.com/ankorstore/yokai/fxhealthcheck"
"github.com/ankorstore/yokai/worker/healthcheck"
"go.uber.org/fx"
)
func Register() fx.Option {
return fx.Options(
// register the WorkerProbe probe for startup, liveness and readiness checks
fxhealthcheck.AsCheckerProbe(healthcheck.NewWorkerProbe),
// ...
)
}
Logging
To get logs correlation in your workers, you need to retrieve the logger from the context with log.CtxLogger()
:
You can also use the shortcut function worker.CtxLogger()
:
As a result, log records will have the worker
name and workerExecutionID
fields added automatically:
INF example message module=worker service=app worker=example-worker workerExecutionID=b57be88f-163f-4a81-bf24-a389c93d804b
The workers logging will be based on the log module configuration.
Tracing
To get traces correlation in your workers, you need to retrieve the tracer provider from the context with trace.CtxTracerProvider()
:
ctx, span := trace.CtxTracerProvider(ctx).Tracer("example tracer").Start(ctx, "example span")
defer span.End()
You can also use the shortcut function worker.CtxTracer()
:
As a result, in your application trace spans attributes:
service.name: app
Worker: example-worker
WorkerExecutionID: b57be88f-163f-4a81-bf24-a389c93d804b
...
The workers tracing will be based on the trace module configuration.
Metrics
You can enable workers executions automatic metrics with modules.worker.metrics.collect.enable=true
:
modules:
worker:
metrics:
collect:
enabled: true # to collect metrics about workers executions
namespace: foo # workers metrics namespace (empty by default)
subsystem: bar # workers metrics subsystem (empty by default)
This will collect metrics about:
- workers
start
andrestart
- workers
successes
- workers
failures
For example, after starting Yokai's workers pool, the core HTTP server will expose in the configured metrics endpoint: