Simple guides introducing Fabrica with hands-on examples.
Fabrica provides a Kubernetes-style reconciliation framework for building declarative infrastructure management systems. The reconciliation pattern enables automatic convergence of actual state to desired state.
The reconciliation system consists of:
Instead of imperative commands:
// Imperative (bad)
device.Connect()
device.Configure()
device.Start()
Use declarative state:
// Declarative (good)
device.Spec.Desired = "running"
device.Spec.Config = {...}
// Reconciler automatically:
// - Connects if not connected
// - Configures if config changed
// - Starts if not running
┌─────────────────────────────────────┐
│ 1. Event triggers reconciliation │
│ (create/update/periodic) │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ 2. Load current resource state │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ 3. Compare Spec vs Status │
│ (desired vs actual) │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ 4. Take actions to converge │
│ (make actual match desired) │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ 5. Update resource Status │
└──────────────┬──────────────────────┘
│
┌──────────────▼──────────────────────┐
│ 6. Requeue if needed │
│ (periodic check/retry) │
└─────────────────────────────────────┘
type Device struct {
Kind string `json:"kind"`
UID string `json:"uid"`
Spec DeviceSpec `json:"spec"`
Status DeviceStatus `json:"status"`
}
type DeviceSpec struct {
Desired string `json:"desired"` // "running", "stopped"
Config Config `json:"config"`
}
type DeviceStatus struct {
State string `json:"state"`
LastSeen string `json:"lastSeen"`
}
func (d *Device) GetKind() string { return d.Kind }
func (d *Device) GetUID() string { return d.UID }
import "github.com/openchami/fabrica/pkg/reconcile"
type DeviceReconciler struct {
reconcile.BaseReconciler
}
func (r *DeviceReconciler) Reconcile(ctx context.Context, resource interface{}) (reconcile.Result, error) {
device := resource.(*Device)
// Compare desired vs actual state
if device.Spec.Desired == "running" && device.Status.State != "running" {
// Take action to start device
if err := r.startDevice(device); err != nil {
return reconcile.Result{}, err
}
// Update status
device.Status.State = "running"
r.UpdateStatus(ctx, device)
// Emit event
r.EmitEvent(ctx, "io.example.device.started", device)
}
// Requeue after 5 minutes for periodic check
return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
}
func (r *DeviceReconciler) GetResourceKind() string {
return "Device"
}
import (
"github.com/openchami/fabrica/pkg/events"
"github.com/openchami/fabrica/pkg/reconcile"
"github.com/openchami/fabrica/pkg/storage"
)
func main() {
// Create event bus
eventBus := events.NewInMemoryEventBus(1000, 10)
eventBus.Start()
defer eventBus.Close()
// Create storage
storage := storage.NewInMemoryStorage()
// Create controller
controller := reconcile.NewController(eventBus, storage)
// Register reconciler
reconciler := &DeviceReconciler{
BaseReconciler: reconcile.BaseReconciler{
EventBus: eventBus,
Logger: reconcile.NewDefaultLogger(),
},
}
controller.RegisterReconciler(reconciler)
// Start controller
ctx := context.Background()
controller.Start(ctx)
defer controller.Stop()
// Controller now reconciles devices automatically!
select {}
}
type Reconciler interface {
// Reconcile brings resource to desired state
Reconcile(ctx context.Context, resource interface{}) (Result, error)
// GetResourceKind returns the resource kind handled
GetResourceKind() string
}
// No requeue needed
return reconcile.Result{}, nil
// Immediate requeue
return reconcile.Result{Requeue: true}, nil
// Requeue after delay
return reconcile.Result{RequeueAfter: 1 * time.Minute}, nil
// Error triggers automatic retry
return reconcile.Result{}, fmt.Errorf("connection failed")
Embed BaseReconciler for common functionality:
// Update resource status
device.Status.State = "running"
r.UpdateStatus(ctx, device)
// Emit event for state change
r.EmitEvent(ctx, "io.example.device.ready", device)
// Set Kubernetes-style condition
r.SetCondition(
device,
"Ready", // type
"True", // status
"DeviceRunning", // reason
"Device is ready", // message
)
The controller uses a work queue for reconciliation requests:
// Enqueue reconciliation request
request := reconcile.ReconcileRequest{
ResourceKind: "Device",
ResourceUID: "dev-123",
Reason: "Manual trigger",
}
controller.Enqueue(request)
// Enqueue with delay
controller.EnqueueAfter(request, 30*time.Second)
// Create rate-limited queue
limiter := reconcile.NewExponentialBackoffRateLimiter(
1*time.Second, // base delay
5*time.Minute, // max delay
)
queue := reconcile.NewRateLimitedWorkQueue(limiter)
// Failed items are requeued with backoff:
// 1s, 2s, 4s, 8s, 16s, ... up to 5 minutes
The controller automatically reconciles resources when events occur:
// Event published
event, _ := events.NewResourceEvent(
"io.example.device.created",
"Device",
"dev-123",
device,
)
eventBus.Publish(ctx, event)
// Controller automatically:
// 1. Detects event matches registered reconciler
// 2. Enqueues reconciliation request
// 3. Calls Reconcile() with loaded resource
The controller subscribes to event patterns:
// Default: subscribe to all events
controller.Start(ctx)
// Custom: subscribe to specific pattern
// (modify controller.go if needed)
eventBus.Subscribe("io.example.device.**", handler)
Ensure eventual consistency with periodic checks:
func (r *DeviceReconciler) Reconcile(ctx context.Context, resource interface{}) (reconcile.Result, error) {
device := resource.(*Device)
// Do reconciliation work
...
// Always requeue for periodic check
return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
}
Reconcile related resources:
func (r *ClusterReconciler) Reconcile(ctx context.Context, resource interface{}) (reconcile.Result, error) {
cluster := resource.(*Cluster)
// Reconcile dependent nodes
for _, nodeUID := range cluster.Spec.Nodes {
r.controller.Enqueue(reconcile.ReconcileRequest{
ResourceKind: "Node",
ResourceUID: nodeUID,
Reason: "Cluster update",
})
}
return reconcile.Result{}, nil
}
func (r *DeviceReconciler) Reconcile(ctx context.Context, resource interface{}) (reconcile.Result, error) {
device := resource.(*Device)
// Transient error - retry with backoff
if err := r.connect(device); err != nil {
r.SetCondition(device, "Ready", "False", "ConnectError", err.Error())
return reconcile.Result{}, err // Auto-retry with exponential backoff
}
// Permanent error - don't retry
if device.Spec.Config == nil {
r.SetCondition(device, "Ready", "False", "InvalidConfig", "Config is nil")
return reconcile.Result{}, nil // No retry
}
// Temporary issue - retry after delay
if !r.isReady(device) {
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}
return reconcile.Result{}, nil
}
Track resource ownership:
type Resource struct {
Kind string
UID string
OwnerUID string // Parent resource UID
}
func (r *NodeReconciler) Reconcile(ctx context.Context, resource interface{}) (reconcile.Result, error) {
node := resource.(*Node)
// If owner deleted, delete this resource
if node.OwnerUID != "" {
owner, err := r.Client.Get(ctx, "Cluster", node.OwnerUID)
if err != nil {
// Owner deleted - delete this node
return reconcile.Result{}, r.Client.Delete(ctx, "Node", node.UID)
}
}
return reconcile.Result{}, nil
}
package main
import (
"context"
"fmt"
"time"
"github.com/openchami/fabrica/pkg/events"
"github.com/openchami/fabrica/pkg/reconcile"
"github.com/openchami/fabrica/pkg/storage"
)
// Resource definition
type Device struct {
Kind string `json:"kind"`
UID string `json:"uid"`
Spec DeviceSpec `json:"spec"`
Status DeviceStatus `json:"status"`
}
type DeviceSpec struct {
Desired string `json:"desired"`
Config string `json:"config"`
}
type DeviceStatus struct {
State string `json:"state"`
LastSeen string `json:"lastSeen"`
Conditions []Condition `json:"conditions"`
}
type Condition struct {
Type string `json:"type"`
Status string `json:"status"`
Reason string `json:"reason"`
Message string `json:"message"`
}
func (d *Device) GetKind() string { return d.Kind }
func (d *Device) GetUID() string { return d.UID }
// Reconciler implementation
type DeviceReconciler struct {
reconcile.BaseReconciler
}
func (r *DeviceReconciler) Reconcile(ctx context.Context, resource interface{}) (reconcile.Result, error) {
device := resource.(*Device)
r.Logger.Infof("Reconciling device %s (desired: %s, current: %s)",
device.UID, device.Spec.Desired, device.Status.State)
// Handle desired state
switch device.Spec.Desired {
case "running":
if device.Status.State != "running" {
// Simulate starting device
r.Logger.Infof("Starting device %s", device.UID)
device.Status.State = "running"
device.Status.LastSeen = time.Now().Format(time.RFC3339)
r.SetCondition(device, "Ready", "True", "DeviceRunning", "Device is running")
r.UpdateStatus(ctx, device)
r.EmitEvent(ctx, "io.example.device.started", device)
}
case "stopped":
if device.Status.State != "stopped" {
// Simulate stopping device
r.Logger.Infof("Stopping device %s", device.UID)
device.Status.State = "stopped"
r.SetCondition(device, "Ready", "False", "DeviceStopped", "Device is stopped")
r.UpdateStatus(ctx, device)
r.EmitEvent(ctx, "io.example.device.stopped", device)
}
}
// Requeue after 5 minutes for periodic check
return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
}
func (r *DeviceReconciler) GetResourceKind() string {
return "Device"
}
func main() {
// Set up infrastructure
eventBus := events.NewInMemoryEventBus(1000, 10)
eventBus.Start()
defer eventBus.Close()
storage := storage.NewInMemoryStorage()
controller := reconcile.NewController(eventBus, storage)
// Register reconciler
reconciler := &DeviceReconciler{
BaseReconciler: reconcile.BaseReconciler{
EventBus: eventBus,
Logger: reconcile.NewDefaultLogger(),
},
}
controller.RegisterReconciler(reconciler)
// Start controller
ctx := context.Background()
controller.Start(ctx)
defer controller.Stop()
// Simulate creating a device
device := &Device{
Kind: "Device",
UID: "dev-123",
Spec: DeviceSpec{
Desired: "running",
Config: "default",
},
Status: DeviceStatus{
State: "stopped",
},
}
// Store device
storage.Save(ctx, device.Kind, device.UID, device)
// Publish event to trigger reconciliation
event, _ := events.NewResourceEvent(
"io.example.device.created",
"Device",
device.UID,
device,
)
eventBus.Publish(ctx, *event)
// Wait for reconciliation
time.Sleep(2 * time.Second)
// Output:
// [INFO] Registered reconciler for Device
// [INFO] Starting reconciliation controller with 5 workers
// [INFO] Reconciling device dev-123 (desired: running, current: stopped)
// [INFO] Starting device dev-123
// [INFO] Emitted event io.example.device.started for Device/dev-123
}