Persistence Guide¶
The pkg/persistence package provides database integration and state persistence capabilities for GoLangGraph.
Overview¶
The persistence package enables: - State Checkpointing: Save and restore workflow states - Database Integration: Support for PostgreSQL and Redis - Session Management: Thread-safe session handling - Memory Storage: In-memory persistence for development
Checkpointing¶
Memory Checkpointer¶
For development and testing, use the in-memory checkpointer:
import "github.com/piotrlaczkowski/GoLangGraph/pkg/persistence"
// Create memory checkpointer
checkpointer := persistence.NewMemoryCheckpointer()
// Save a checkpoint
checkpoint := &persistence.Checkpoint{
ID: "checkpoint_1",
ThreadID: "thread_1",
State: state, // *core.BaseState
Metadata: map[string]interface{}{
"step": "processing",
},
CreatedAt: time.Now(),
NodeID: "current_node",
StepID: 1,
}
err := checkpointer.Save(context.Background(), checkpoint)
if err != nil {
log.Printf("Failed to save checkpoint: %v", err)
}
// Load a checkpoint
loaded, err := checkpointer.Load(context.Background(), "thread_1", "checkpoint_1")
if err != nil {
log.Printf("Failed to load checkpoint: %v", err)
}
// List checkpoints for a thread
checkpoints, err := checkpointer.List(context.Background(), "thread_1")
if err != nil {
log.Printf("Failed to list checkpoints: %v", err)
}
for _, meta := range checkpoints {
fmt.Printf("Checkpoint: %s at %v\n", meta.ID, meta.CreatedAt)
}
// Delete a checkpoint
err = checkpointer.Delete(context.Background(), "thread_1", "checkpoint_1")
if err != nil {
log.Printf("Failed to delete checkpoint: %v", err)
}
Database Integration¶
For production use, integrate with databases:
// Database configuration
config := &persistence.DatabaseConfig{
Type: persistence.DatabaseTypePostgres,
Host: "localhost",
Port: 5432,
Database: "golanggraph",
Username: "user",
Password: "password",
SSLMode: "disable",
}
// Create database connection
db, err := persistence.NewDatabaseConnection(config)
if err != nil {
log.Fatal("Failed to connect to database:", err)
}
defer db.Close()
// Create database checkpointer
checkpointer, err := persistence.NewDatabaseCheckpointer(db)
if err != nil {
log.Fatal("Failed to create database checkpointer:", err)
}
// Use the same API as memory checkpointer
err = checkpointer.Save(context.Background(), checkpoint)
Session Management¶
Manage conversation threads and sessions:
// Create session manager
sessionManager := persistence.NewSessionManager(checkpointer)
// Create a new session
session, err := sessionManager.CreateSession(context.Background(), &persistence.SessionConfig{
UserID: "user_123",
AgentID: "agent_456",
Metadata: map[string]interface{}{
"type": "chat",
},
})
if err != nil {
log.Printf("Failed to create session: %v", err)
}
// Get session
session, err = sessionManager.GetSession(context.Background(), session.ID)
if err != nil {
log.Printf("Failed to get session: %v", err)
}
// Create a thread within the session
thread, err := sessionManager.CreateThread(context.Background(), session.ID, &persistence.ThreadConfig{
Name: "Main Conversation",
Metadata: map[string]interface{}{
"topic": "AI assistance",
},
})
if err != nil {
log.Printf("Failed to create thread: %v", err)
}
// List threads in a session
threads, err := sessionManager.ListThreads(context.Background(), session.ID)
if err != nil {
log.Printf("Failed to list threads: %v", err)
}
Integration with Agents¶
Use persistence with agents for stateful conversations:
// Create agent with persistence
config := &agent.AgentConfig{
Name: "persistent-agent",
Type: agent.AgentTypeChat,
Model: "gemma3:1b",
Provider: "ollama",
SystemPrompt: "You are a helpful assistant with memory.",
}
// Create agent
chatAgent := agent.NewAgent(config, llmManager, toolRegistry)
// Set up persistence
checkpointer := persistence.NewMemoryCheckpointer()
sessionManager := persistence.NewSessionManager(checkpointer)
// Create session for user
session, err := sessionManager.CreateSession(context.Background(), &persistence.SessionConfig{
UserID: "user_123",
AgentID: config.Name,
})
if err != nil {
log.Fatal(err)
}
// Execute with session context
ctx := context.WithValue(context.Background(), "session_id", session.ID)
execution, err := chatAgent.Execute(ctx, "Hello! Remember that I like Go programming.")
if err != nil {
log.Fatal(err)
}
// Save checkpoint after execution
checkpoint := &persistence.Checkpoint{
ID: fmt.Sprintf("checkpoint_%d", time.Now().Unix()),
ThreadID: session.ID,
State: execution.State, // Assuming execution returns state
CreatedAt: time.Now(),
}
err = checkpointer.Save(ctx, checkpoint)
if err != nil {
log.Printf("Failed to save checkpoint: %v", err)
}
Configuration¶
Environment Variables¶
Configure persistence using environment variables:
# PostgreSQL
export POSTGRES_HOST=localhost
export POSTGRES_PORT=5432
export POSTGRES_DB=golanggraph
export POSTGRES_USER=user
export POSTGRES_PASSWORD=password
export POSTGRES_SSLMODE=disable
# Redis
export REDIS_HOST=localhost
export REDIS_PORT=6379
export REDIS_PASSWORD=
export REDIS_DB=0
Configuration Struct¶
type DatabaseConfig struct {
Type DatabaseType `json:"type"`
Host string `json:"host"`
Port int `json:"port"`
Database string `json:"database"`
Username string `json:"username"`
Password string `json:"password"`
SSLMode string `json:"ssl_mode"`
}
// Load from environment
config := &persistence.DatabaseConfig{
Type: persistence.DatabaseTypePostgres,
Host: os.Getenv("POSTGRES_HOST"),
Port: 5432, // or parse from env
Database: os.Getenv("POSTGRES_DB"),
Username: os.Getenv("POSTGRES_USER"),
Password: os.Getenv("POSTGRES_PASSWORD"),
SSLMode: os.Getenv("POSTGRES_SSLMODE"),
}
Best Practices¶
1. Checkpoint Management¶
- Save checkpoints at logical workflow boundaries
- Use descriptive checkpoint IDs
- Clean up old checkpoints periodically
- Include relevant metadata for debugging
2. Session Management¶
- Create sessions per user or conversation
- Use threads to organize related interactions
- Store user preferences in session metadata
- Implement session expiration policies
3. Error Handling¶
- Always handle database connection errors
- Implement retry logic for transient failures
- Log persistence operations for debugging
- Validate data before saving
4. Performance¶
- Use connection pooling for database connections
- Implement caching for frequently accessed data
- Batch operations when possible
- Monitor database performance
Examples¶
Simple Checkpointing¶
func saveWorkflowProgress(checkpointer persistence.Checkpointer, threadID string, state *core.BaseState) error {
checkpoint := &persistence.Checkpoint{
ID: fmt.Sprintf("progress_%d", time.Now().Unix()),
ThreadID: threadID,
State: state,
CreatedAt: time.Now(),
}
return checkpointer.Save(context.Background(), checkpoint)
}
func loadWorkflowProgress(checkpointer persistence.Checkpointer, threadID, checkpointID string) (*core.BaseState, error) {
checkpoint, err := checkpointer.Load(context.Background(), threadID, checkpointID)
if err != nil {
return nil, err
}
return checkpoint.State, nil
}
Session-based Chat¶
func handleChatMessage(sessionManager *persistence.SessionManager, userID, message string) (string, error) {
// Get or create session
sessions, err := sessionManager.ListSessions(context.Background(), userID)
if err != nil {
return "", err
}
var session *persistence.Session
if len(sessions) == 0 {
session, err = sessionManager.CreateSession(context.Background(), &persistence.SessionConfig{
UserID: userID,
})
if err != nil {
return "", err
}
} else {
session = sessions[0] // Use most recent session
}
// Process message with session context
ctx := context.WithValue(context.Background(), "session_id", session.ID)
// Execute agent (implementation depends on your agent setup)
response := processMessage(ctx, message)
return response, nil
}
This persistence package provides the foundation for building stateful AI applications with GoLangGraph, enabling you to maintain conversation history, save workflow progress, and integrate with production databases.