Simple guides introducing Fabrica with hands-on examples.
Fabrica provides a CloudEvents-compliant event system for building event-driven applications. The event system enables decoupled communication between components through a publish-subscribe pattern.
The event system consists of:
Fabrica automatically publishes two categories of events:
Published automatically when resources are created, updated, patched, or deleted:
{prefix}.{resource}.created - Resource creation{prefix}.{resource}.updated - Resource updates via PUT{prefix}.{resource}.patched - Resource updates via PATCH{prefix}.{resource}.deleted - Resource deletionPublished automatically when resource conditions change:
{prefix}.condition.{conditiontype} - When a condition status changesimport "github.com/openchami/fabrica/pkg/events"
// Create in-memory event bus
eventBus := events.NewInMemoryEventBus(
1000, // buffer size
10, // worker count
)
// Start processing events
eventBus.Start()
defer eventBus.Close()
// Create a simple event
event, err := events.NewEvent(
"io.example.user.created", // event type
"/users/user-123", // source
userData, // data payload
)
if err != nil {
log.Fatal(err)
}
// Publish the event
err = eventBus.Publish(context.Background(), *event)
if err != nil {
log.Fatal(err)
}
// Subscribe to specific event type
id, err := eventBus.Subscribe(
"io.example.user.created",
func(ctx context.Context, event events.Event) error {
fmt.Printf("User created: %s\n", event.ID())
return nil
},
)
// Unsubscribe when done
defer eventBus.Unsubscribe(id)
Configure Fabrica’s automatic event publishing with EventConfig:
import "github.com/openchami/fabrica/pkg/events"
// Configure event system
config := &events.EventConfig{
Enabled: true, // Enable/disable all events
LifecycleEventsEnabled: true, // Enable CRUD operation events
ConditionEventsEnabled: true, // Enable condition change events
EventTypePrefix: "io.fabrica", // Event type prefix
ConditionEventPrefix: "io.fabrica.condition", // Condition event prefix
Source: "inventory-api", // Event source identifier
}
// Apply configuration globally
events.SetEventConfig(config)
Configure events via environment variables in generated servers:
# Enable/disable event publishing
FABRICA_EVENTS_ENABLED=true
FABRICA_LIFECYCLE_EVENTS_ENABLED=true
FABRICA_CONDITION_EVENTS_ENABLED=true
# Customize event prefixes
FABRICA_EVENT_PREFIX=io.mycompany.inventory
FABRICA_CONDITION_EVENT_PREFIX=io.mycompany.inventory.condition
FABRICA_EVENT_SOURCE=production-api
With prefix io.fabrica and resource Device:
Lifecycle Events:
io.fabrica.device.created - Device creationio.fabrica.device.updated - Device update (PUT)io.fabrica.device.patched - Device patch (PATCH)io.fabrica.device.deleted - Device deletionCondition Events:
io.fabrica.condition.ready - Ready condition changedio.fabrica.condition.healthy - Healthy condition changedio.fabrica.condition.available - Available condition changedFabrica uses the CloudEvents specification for all events.
Every event contains:
For resource-specific events, use NewResourceEvent:
event, err := events.NewResourceEvent(
"io.example.device.connected", // event type
"Device", // resource kind
"dev-abc123", // resource UID
deviceData, // data payload
)
This automatically:
/resources/{kind}/{uid}resourcekind extension attributeresourceuid extension attributefunc handleEvent(ctx context.Context, event events.Event) error {
// Basic attributes
fmt.Printf("ID: %s\n", event.ID())
fmt.Printf("Type: %s\n", event.Type())
fmt.Printf("Source: %s\n", event.Source())
fmt.Printf("Time: %s\n", event.Time())
// Resource extensions (if present)
kind := event.ResourceKind()
uid := event.ResourceUID()
// Event data
var data MyDataType
err := event.DataAs(&data)
if err != nil {
return err
}
return nil
}
Subscribe to multiple event types using wildcards:
*)Matches exactly one segment:
// Matches: io.example.user.created, io.example.user.updated
// Does NOT match: io.example.user.group.created
eventBus.Subscribe("io.example.user.*", handler)
**)Matches one or more segments:
// Matches: io.example.user.created
// io.example.user.group.created
// io.example.user.x.y.z
eventBus.Subscribe("io.example.user.**", handler)
// All events
eventBus.Subscribe("**", handler)
// All events for a specific resource kind
eventBus.Subscribe("io.example.device.**", handler)
// Specific operation across all resources
eventBus.Subscribe("io.example.*.created", handler)
// Exact match
eventBus.Subscribe("io.example.device.connected", handler)
Use reverse domain notation:
{domain}.{application}.{resource}.{action}
Examples:
io.example.user.createdio.example.device.connectedio.example.order.shippedio.example.payment.completedCreate/Update/Delete:
events.NewResourceEvent("io.example.device.created", kind, uid, resource)
events.NewResourceEvent("io.example.device.updated", kind, uid, resource)
events.NewResourceEvent("io.example.device.deleted", kind, uid, resource)
State Changes:
events.NewResourceEvent("io.example.device.connected", kind, uid, resource)
events.NewResourceEvent("io.example.device.disconnected", kind, uid, resource)
events.NewResourceEvent("io.example.device.failed", kind, uid, resource)
Operations:
events.NewResourceEvent("io.example.order.shipped", kind, uid, order)
events.NewResourceEvent("io.example.payment.processed", kind, uid, payment)
Fabrica automatically publishes events when resource conditions change, following Kubernetes condition patterns.
Conditions represent the current state of a resource:
type Condition struct {
Type string `json:"type"` // "Ready", "Healthy", "Available"
Status string `json:"status"` // "True", "False", "Unknown"
LastTransitionTime time.Time `json:"lastTransitionTime"` // When status last changed
Reason string `json:"reason,omitempty"` // Machine-readable reason
Message string `json:"message,omitempty"` // Human-readable message
}
When you update resource conditions, events are published automatically:
import "github.com/openchami/fabrica/pkg/resource"
// This will publish a condition event if the status changes
changed := resource.SetResourceCondition(ctx, device,
"Ready", // condition type
"True", // status
"DeviceOnline", // reason
"Device is operational" // message
)
if changed {
// Event published: "io.fabrica.condition.ready"
log.Println("Ready condition changed - event published")
}
Condition events use the CloudEvents format:
{
"specversion": "1.0",
"type": "io.fabrica.condition.ready",
"source": "inventory-api",
"id": "condition-event-abc123",
"time": "2025-10-21T15:30:45Z",
"datacontenttype": "application/json",
"subject": "devices/dev-123",
"data": {
"resourceKind": "Device",
"resourceUID": "dev-123",
"condition": {
"type": "Ready",
"status": "True",
"reason": "DeviceOnline",
"message": "Device is operational",
"lastTransitionTime": "2025-10-21T15:30:45Z"
}
}
}
Subscribe to condition events using wildcards:
// All condition events
eventBus.Subscribe("io.fabrica.condition.**", handleConditionEvent)
// Specific condition type
eventBus.Subscribe("io.fabrica.condition.ready", handleReadyCondition)
// Condition events for specific resource
eventBus.Subscribe("io.fabrica.condition.*", func(ctx context.Context, event events.Event) error {
var conditionData struct {
ResourceKind string `json:"resourceKind"`
ResourceUID string `json:"resourceUID"`
Condition struct {
Type string `json:"type"`
Status string `json:"status"`
Reason string `json:"reason"`
} `json:"condition"`
}
if err := event.DataAs(&conditionData); err != nil {
return err
}
if conditionData.ResourceKind == "Device" && conditionData.Condition.Type == "Ready" {
if conditionData.Condition.Status == "False" {
// Send alert - device became not ready
sendDeviceAlert(conditionData.ResourceUID, conditionData.Condition.Reason)
}
}
return nil
})
Standard Kubernetes-style conditions:
Ready - Resource is ready to serve requestsAvailable - Resource is available for useProgressing - Resource is making progress toward desired stateCustom application conditions:
Healthy - Health check statusConnected - Network connectivity statusAuthenticated - Authentication statusValidated - Data validation status| Aspect | Lifecycle Events | Condition Events |
|---|---|---|
| Trigger | CRUD operations | Condition status changes |
| Frequency | Every operation | Only when status changes |
| Data | Full resource | Condition + resource context |
| Use Cases | Audit, integration | Monitoring, alerting |
| Examples | device.created, user.updated |
condition.ready, condition.healthy |
The in-memory implementation provides:
eventBus := events.NewInMemoryEventBus(
bufferSize, // Event queue buffer size (default: 1000)
workerCount, // Number of worker goroutines (default: 10)
)
Buffer Size:
Worker Count:
Advantages:
Limitations:
Event handlers should return errors:
handler := func(ctx context.Context, event events.Event) error {
if err := processEvent(event); err != nil {
// Error is logged but doesn't stop processing
return fmt.Errorf("failed to process event: %w", err)
}
return nil
}
Use context for cancellation and timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := eventBus.Publish(ctx, event)
if err != nil {
// Handle timeout or cancellation
}
Add custom attributes to events:
event.SetExtension("tenant", "acme-corp")
event.SetExtension("priority", "high")
event.SetExtension("region", "us-west")
// Access in handler
tenant := event.Extensions()["tenant"]
One event can trigger multiple handlers:
// Logging handler
eventBus.Subscribe("**", loggingHandler)
// Metrics handler
eventBus.Subscribe("**", metricsHandler)
// Business logic handler
eventBus.Subscribe("io.example.order.*", orderHandler)
type EventStore struct {
bus events.EventBus
}
func (s *EventStore) SaveAggregate(aggregate Aggregate) error {
// Save events
for _, event := range aggregate.Events() {
if err := s.bus.Publish(ctx, event); err != nil {
return err
}
}
return nil
}
// Command side publishes events
func CreateUser(cmd CreateUserCommand) error {
user := newUser(cmd)
event, _ := events.NewResourceEvent(
"io.example.user.created",
"User",
user.UID,
user,
)
return eventBus.Publish(ctx, event)
}
// Query side subscribes to events
eventBus.Subscribe("io.example.user.**", func(ctx context.Context, e events.Event) error {
// Update read model
return updateReadModel(e)
})
// Order saga
eventBus.Subscribe("io.example.order.created", func(ctx context.Context, e events.Event) error {
// Reserve inventory
// Process payment
// Ship order
return nil
})
** for metrics/loggingpackage main
import (
"context"
"fmt"
"github.com/openchami/fabrica/pkg/events"
)
type Device struct {
UID string
Name string
Status string
}
func main() {
// Create event bus
bus := events.NewInMemoryEventBus(1000, 10)
bus.Start()
defer bus.Close()
// Subscribe to all device events
bus.Subscribe("io.example.device.**", func(ctx context.Context, e events.Event) error {
fmt.Printf("[LOG] Device event: %s\n", e.Type())
return nil
})
// Subscribe to connected events
bus.Subscribe("io.example.device.connected", func(ctx context.Context, e events.Event) error {
var device Device
e.DataAs(&device)
fmt.Printf("[NOTIFY] Device %s is now online\n", device.Name)
return nil
})
// Publish event
device := Device{
UID: "dev-123",
Name: "Sensor-01",
Status: "connected",
}
event, _ := events.NewResourceEvent(
"io.example.device.connected",
"Device",
device.UID,
device,
)
bus.Publish(context.Background(), *event)
// Output:
// [LOG] Device event: io.example.device.connected
// [NOTIFY] Device Sensor-01 is now online
}