Skip to content

Persistence Package Documentation

The pkg/persistence package provides comprehensive database integration and state persistence capabilities for GoLangGraph, including support for traditional databases, vector databases, and checkpointing systems.

Overview

The persistence package enables: - Database Connections: Support for PostgreSQL, Redis, and vector databases - State Checkpointing: Save and restore workflow states - Vector Storage: RAG (Retrieval-Augmented Generation) capabilities - Document Management: Store and search documents with embeddings - Session Management: Thread-safe session and conversation handling

Supported Databases

PostgreSQL

Full-featured relational database support with advanced features: - JSON/JSONB support for flexible data storage - Connection pooling and transaction management - Schema migrations and versioning - Full-text search capabilities

Redis

High-performance in-memory data store: - Key-value storage with expiration - Pub/Sub messaging for real-time updates - Caching layer for improved performance - Session storage and management

pgvector

Vector database capabilities for AI applications: - High-dimensional vector storage - Similarity search with multiple distance metrics - Embedding storage and retrieval - RAG (Retrieval-Augmented Generation) support

Database Configuration

PostgreSQL Configuration

import "github.com/piotrlaczkowski/GoLangGraph/pkg/persistence"

// Basic PostgreSQL configuration
pgConfig := persistence.PostgreSQLConfig{
    Host:     "localhost",
    Port:     5432,
    Database: "golanggraph",
    Username: "user",
    Password: "password",
    SSLMode:  "disable",
}

// Advanced configuration with connection pooling
pgConfig = persistence.PostgreSQLConfig{
    Host:            "localhost",
    Port:            5432,
    Database:        "golanggraph",
    Username:        "user",
    Password:        "password",
    SSLMode:         "require",
    MaxConnections:  25,
    MaxIdleConns:    5,
    ConnMaxLifetime: 30 * time.Minute,
    ConnMaxIdleTime: 5 * time.Minute,
}

// Validate configuration
if err := pgConfig.Validate(); err != nil {
    log.Fatal("Invalid PostgreSQL config:", err)
}

Redis Configuration

// Basic Redis configuration
redisConfig := persistence.RedisConfig{
    Host:     "localhost",
    Port:     6379,
    Password: "",
    Database: 0,
}

// Advanced configuration with clustering
redisConfig = persistence.RedisConfig{
    Host:           "localhost",
    Port:           6379,
    Password:       "secure_password",
    Database:       0,
    PoolSize:       10,
    MinIdleConns:   3,
    MaxRetries:     3,
    DialTimeout:    5 * time.Second,
    ReadTimeout:    3 * time.Second,
    WriteTimeout:   3 * time.Second,
    PoolTimeout:    4 * time.Second,
    IdleTimeout:    5 * time.Minute,
}

// Validate configuration
if err := redisConfig.Validate(); err != nil {
    log.Fatal("Invalid Redis config:", err)
}

pgvector Configuration

// pgvector configuration for RAG applications
pgvectorConfig := persistence.PgVectorConfig{
    Host:       "localhost",
    Port:       5432,
    Database:   "vectordb",
    Username:   "vector_user",
    Password:   "vector_password",
    SSLMode:    "disable",
    Dimensions: 1536, // OpenAI embedding dimensions

    // Vector-specific settings
    IndexType:    "ivfflat",
    IndexOptions: map[string]interface{}{
        "lists": 100,
    },
    DistanceMetric: "cosine",
}

// Validate configuration
if err := pgvectorConfig.Validate(); err != nil {
    log.Fatal("Invalid pgvector config:", err)
}

Database Manager

The DatabaseManager provides centralized database connection management:

// Create database manager
dbManager := persistence.NewDatabaseManager()

// Add database connections
err := dbManager.AddPostgreSQL("main", pgConfig)
if err != nil {
    log.Fatal("Failed to add PostgreSQL:", err)
}

err = dbManager.AddRedis("cache", redisConfig)
if err != nil {
    log.Fatal("Failed to add Redis:", err)
}

err = dbManager.AddPgVector("vectors", pgvectorConfig)
if err != nil {
    log.Fatal("Failed to add pgvector:", err)
}

// Get connections
pgConn, err := dbManager.GetPostgreSQL("main")
if err != nil {
    log.Fatal("Failed to get PostgreSQL connection:", err)
}

redisConn, err := dbManager.GetRedis("cache")
if err != nil {
    log.Fatal("Failed to get Redis connection:", err)
}

vectorConn, err := dbManager.GetPgVector("vectors")
if err != nil {
    log.Fatal("Failed to get pgvector connection:", err)
}

// Health checks
healthStatus := dbManager.HealthCheck()
for name, healthy := range healthStatus {
    if healthy {
        fmt.Printf("Database %s: healthy\n", name)
    } else {
        fmt.Printf("Database %s: unhealthy\n", name)
    }
}

// Close all connections
defer dbManager.Close()

Checkpointing System

The checkpointing system allows you to save and restore workflow states:

Database Checkpointer

// Create database checkpointer
checkpointer, err := persistence.NewDatabaseCheckpointer(dbManager, "main")
if err != nil {
    log.Fatal("Failed to create checkpointer:", err)
}

// Save a checkpoint
checkpoint := &persistence.Checkpoint{
    ThreadID:    "conversation-123",
    State:       state,
    Metadata:    map[string]interface{}{
        "user_id":    "user123",
        "session_id": "session456",
        "step":       "processing",
    },
    Timestamp:   time.Now(),
    Version:     1,
}

err = checkpointer.SaveCheckpoint(checkpoint)
if err != nil {
    log.Fatal("Failed to save checkpoint:", err)
}

// Load a checkpoint
loadedCheckpoint, err := checkpointer.LoadCheckpoint("conversation-123")
if err != nil {
    log.Fatal("Failed to load checkpoint:", err)
}

// List checkpoints for a thread
checkpoints, err := checkpointer.ListCheckpoints("conversation-123")
if err != nil {
    log.Fatal("Failed to list checkpoints:", err)
}

// Delete old checkpoints
err = checkpointer.DeleteCheckpoint("conversation-123", 1)
if err != nil {
    log.Fatal("Failed to delete checkpoint:", err)
}

Memory Checkpointer

// Create in-memory checkpointer (for testing/development)
memCheckpointer := persistence.NewMemoryCheckpointer()

// Use the same interface as database checkpointer
err = memCheckpointer.SaveCheckpoint(checkpoint)
if err != nil {
    log.Fatal("Failed to save checkpoint:", err)
}

loadedCheckpoint, err := memCheckpointer.LoadCheckpoint("conversation-123")
if err != nil {
    log.Fatal("Failed to load checkpoint:", err)
}

Vector Database & RAG Support

Document Storage

// Create vector store
vectorStore, err := persistence.NewPgVectorStore(pgvectorConfig)
if err != nil {
    log.Fatal("Failed to create vector store:", err)
}

// Define documents
documents := []persistence.Document{
    {
        ID:      "doc1",
        Content: "GoLangGraph is a powerful framework for building AI agent workflows.",
        Metadata: map[string]interface{}{
            "source":    "documentation",
            "category":  "framework",
            "timestamp": time.Now(),
        },
        Embedding: []float32{0.1, 0.2, 0.3, /* ... 1536 dimensions */},
    },
    {
        ID:      "doc2",
        Content: "The persistence package provides database integration capabilities.",
        Metadata: map[string]interface{}{
            "source":    "documentation",
            "category":  "persistence",
            "timestamp": time.Now(),
        },
        Embedding: []float32{0.2, 0.3, 0.4, /* ... 1536 dimensions */},
    },
}

// Store documents
err = vectorStore.StoreDocuments(documents)
if err != nil {
    log.Fatal("Failed to store documents:", err)
}

// Update a document
updatedDoc := persistence.Document{
    ID:      "doc1",
    Content: "GoLangGraph is an advanced framework for building AI agent workflows with persistence.",
    Metadata: map[string]interface{}{
        "source":    "documentation",
        "category":  "framework",
        "updated":   time.Now(),
    },
    Embedding: []float32{0.15, 0.25, 0.35, /* ... 1536 dimensions */},
}

err = vectorStore.UpdateDocument(updatedDoc)
if err != nil {
    log.Fatal("Failed to update document:", err)
}
// Search by embedding vector
queryEmbedding := []float32{0.1, 0.2, 0.3, /* ... 1536 dimensions */}
results, err := vectorStore.SimilaritySearchByVector(queryEmbedding, 5)
if err != nil {
    log.Fatal("Failed to search by vector:", err)
}

// Search by text (requires embedding generation)
textResults, err := vectorStore.SimilaritySearch("AI agent workflows", 5)
if err != nil {
    log.Fatal("Failed to search by text:", err)
}

// Process results
for _, result := range results {
    fmt.Printf("Document ID: %s\n", result.ID)
    fmt.Printf("Content: %s\n", result.Content)
    fmt.Printf("Similarity Score: %.4f\n", result.Score)
    fmt.Printf("Metadata: %+v\n", result.Metadata)
    fmt.Println("---")
}

Advanced Search with Filters

// Search with metadata filters
filters := map[string]interface{}{
    "category": "framework",
    "source":   "documentation",
}

filteredResults, err := vectorStore.SimilaritySearchWithFilters(
    queryEmbedding, 
    5, 
    filters,
)
if err != nil {
    log.Fatal("Failed to search with filters:", err)
}

// Search with score threshold
thresholdResults, err := vectorStore.SimilaritySearchWithThreshold(
    queryEmbedding,
    5,
    0.8, // Minimum similarity score
)
if err != nil {
    log.Fatal("Failed to search with threshold:", err)
}

Session and Thread Management

Session Management

// Create session manager
sessionManager := persistence.NewSessionManager(dbManager, "main")

// Create a new session
session := &persistence.Session{
    ID:        "session-123",
    UserID:    "user-456",
    Metadata:  map[string]interface{}{
        "app_version": "1.0.0",
        "user_agent":  "GoLangGraph-Client/1.0",
    },
    CreatedAt: time.Now(),
    UpdatedAt: time.Now(),
}

err = sessionManager.CreateSession(session)
if err != nil {
    log.Fatal("Failed to create session:", err)
}

// Get session
retrievedSession, err := sessionManager.GetSession("session-123")
if err != nil {
    log.Fatal("Failed to get session:", err)
}

// Update session
retrievedSession.Metadata["last_activity"] = time.Now()
err = sessionManager.UpdateSession(retrievedSession)
if err != nil {
    log.Fatal("Failed to update session:", err)
}

// List user sessions
userSessions, err := sessionManager.ListUserSessions("user-456")
if err != nil {
    log.Fatal("Failed to list user sessions:", err)
}

// Delete session
err = sessionManager.DeleteSession("session-123")
if err != nil {
    log.Fatal("Failed to delete session:", err)
}

Thread Management

// Create thread manager
threadManager := persistence.NewThreadManager(dbManager, "main")

// Create a new thread
thread := &persistence.Thread{
    ID:        "thread-789",
    SessionID: "session-123",
    UserID:    "user-456",
    Title:     "AI Workflow Discussion",
    Metadata:  map[string]interface{}{
        "topic":    "workflow_design",
        "priority": "high",
    },
    CreatedAt: time.Now(),
    UpdatedAt: time.Now(),
}

err = threadManager.CreateThread(thread)
if err != nil {
    log.Fatal("Failed to create thread:", err)
}

// Get thread
retrievedThread, err := threadManager.GetThread("thread-789")
if err != nil {
    log.Fatal("Failed to get thread:", err)
}

// List session threads
sessionThreads, err := threadManager.ListSessionThreads("session-123")
if err != nil {
    log.Fatal("Failed to list session threads:", err)
}

// Update thread
retrievedThread.Title = "Updated AI Workflow Discussion"
err = threadManager.UpdateThread(retrievedThread)
if err != nil {
    log.Fatal("Failed to update thread:", err)
}

// Delete thread
err = threadManager.DeleteThread("thread-789")
if err != nil {
    log.Fatal("Failed to delete thread:", err)
}

Advanced Features

Connection Pooling

// Configure connection pooling for PostgreSQL
pgConfig := persistence.PostgreSQLConfig{
    // ... basic config
    MaxConnections:  25,              // Maximum number of connections
    MaxIdleConns:    5,               // Maximum idle connections
    ConnMaxLifetime: 30 * time.Minute, // Connection lifetime
    ConnMaxIdleTime: 5 * time.Minute,  // Idle connection timeout
}

// Configure connection pooling for Redis
redisConfig := persistence.RedisConfig{
    // ... basic config
    PoolSize:     10,               // Connection pool size
    MinIdleConns: 3,                // Minimum idle connections
    PoolTimeout:  4 * time.Second,  // Pool timeout
    IdleTimeout:  5 * time.Minute,  // Idle connection timeout
}

Transaction Management

// Begin transaction
tx, err := pgConn.BeginTx(context.Background(), nil)
if err != nil {
    log.Fatal("Failed to begin transaction:", err)
}
defer tx.Rollback() // Rollback if not committed

// Perform operations within transaction
_, err = tx.ExecContext(context.Background(), 
    "INSERT INTO checkpoints (thread_id, state_data) VALUES ($1, $2)",
    "thread-123", stateData)
if err != nil {
    log.Fatal("Failed to insert checkpoint:", err)
}

_, err = tx.ExecContext(context.Background(),
    "UPDATE sessions SET updated_at = $1 WHERE id = $2",
    time.Now(), "session-123")
if err != nil {
    log.Fatal("Failed to update session:", err)
}

// Commit transaction
err = tx.Commit()
if err != nil {
    log.Fatal("Failed to commit transaction:", err)
}

Batch Operations

// Batch insert documents
batchSize := 100
documents := make([]persistence.Document, 1000)
// ... populate documents

for i := 0; i < len(documents); i += batchSize {
    end := i + batchSize
    if end > len(documents) {
        end = len(documents)
    }

    batch := documents[i:end]
    err := vectorStore.StoreDocuments(batch)
    if err != nil {
        log.Printf("Failed to store batch %d-%d: %v", i, end, err)
        continue
    }

    fmt.Printf("Stored batch %d-%d\n", i, end)
}

Monitoring and Metrics

// Database health monitoring
go func() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            health := dbManager.HealthCheck()
            for name, healthy := range health {
                if !healthy {
                    log.Printf("Database %s is unhealthy", name)
                    // Trigger alerts or recovery procedures
                }
            }
        }
    }
}()

// Connection pool monitoring
stats := pgConn.Stats()
fmt.Printf("Open connections: %d\n", stats.OpenConnections)
fmt.Printf("In use: %d\n", stats.InUse)
fmt.Printf("Idle: %d\n", stats.Idle)

Testing

The persistence package includes comprehensive tests:

# Run all persistence tests
go test ./pkg/persistence -v

# Run specific test
go test ./pkg/persistence -v -run TestDatabaseManager

# Run integration tests (requires running databases)
go test ./pkg/persistence -v -tags=integration

Example Test

func TestDatabaseCheckpointer(t *testing.T) {
    // Setup test database
    dbManager := setupTestDatabase(t)
    defer dbManager.Close()

    // Create checkpointer
    checkpointer, err := persistence.NewDatabaseCheckpointer(dbManager, "test")
    require.NoError(t, err)

    // Create test checkpoint
    state := core.NewBaseState()
    state.Set("test_key", "test_value")

    checkpoint := &persistence.Checkpoint{
        ThreadID:  "test-thread",
        State:     state,
        Metadata:  map[string]interface{}{"test": true},
        Timestamp: time.Now(),
        Version:   1,
    }

    // Save checkpoint
    err = checkpointer.SaveCheckpoint(checkpoint)
    require.NoError(t, err)

    // Load checkpoint
    loaded, err := checkpointer.LoadCheckpoint("test-thread")
    require.NoError(t, err)
    require.Equal(t, checkpoint.ThreadID, loaded.ThreadID)

    // Verify state
    value, exists := loaded.State.Get("test_key")
    require.True(t, exists)
    require.Equal(t, "test_value", value)
}

Best Practices

1. Connection Management

// ✅ Good: Use connection pooling
dbManager := persistence.NewDatabaseManager()
defer dbManager.Close() // Always close connections

// ❌ Bad: Creating new connections for each operation
// This leads to connection leaks and poor performance

2. Error Handling

// ✅ Good: Handle specific database errors
err := checkpointer.SaveCheckpoint(checkpoint)
if err != nil {
    var pgErr *pq.Error
    if errors.As(err, &pgErr) {
        switch pgErr.Code {
        case "23505": // Unique violation
            return fmt.Errorf("checkpoint already exists: %w", err)
        case "23503": // Foreign key violation
            return fmt.Errorf("invalid thread reference: %w", err)
        default:
            return fmt.Errorf("database error: %w", err)
        }
    }
    return fmt.Errorf("failed to save checkpoint: %w", err)
}

3. Resource Cleanup

// ✅ Good: Always clean up resources
func processWithTransaction(db *sql.DB) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback() // Rollback if not committed

    // ... perform operations

    return tx.Commit()
}

4. Configuration Validation

// ✅ Good: Validate configuration before use
if err := config.Validate(); err != nil {
    return fmt.Errorf("invalid configuration: %w", err)
}

// ❌ Bad: Using configuration without validation
// This can lead to runtime errors

Performance Optimization

1. Connection Pooling

// Optimize connection pool settings based on your workload
pgConfig := persistence.PostgreSQLConfig{
    MaxConnections:  25,              // Based on database limits
    MaxIdleConns:    5,               // Keep some connections ready
    ConnMaxLifetime: 30 * time.Minute, // Prevent stale connections
    ConnMaxIdleTime: 5 * time.Minute,  // Clean up idle connections
}

2. Batch Operations

// Process documents in batches for better performance
const batchSize = 100
for i := 0; i < len(documents); i += batchSize {
    batch := documents[i:min(i+batchSize, len(documents))]
    if err := vectorStore.StoreDocuments(batch); err != nil {
        log.Printf("Batch failed: %v", err)
    }
}

3. Indexing

// Create appropriate indexes for your queries
queries := []string{
    "CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_id ON checkpoints(thread_id)",
    "CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)",
    "CREATE INDEX IF NOT EXISTS idx_documents_metadata ON documents USING GIN(metadata)",
}

for _, query := range queries {
    if _, err := db.Exec(query); err != nil {
        log.Printf("Failed to create index: %v", err)
    }
}

Conclusion

The persistence package provides a comprehensive solution for data storage and retrieval in GoLangGraph applications. With support for multiple database types, advanced features like vector search, and robust error handling, it enables building production-ready AI applications with reliable data persistence.