Persistent Workflows From LlamaIndex
This is such an interesting concept from LlamaIndex, to add persistence to workflow…
To begin with…the term “AI Agent” is not trending as it use to and there as a declining interest in it…a term which has gained ascendancy in terms of interest is Agentic Workflows.
And from LlamaIndex and LangChain there has been immense focus on Agentic Workflows…where there is a level of Agency within the guiding parameters of a workflow or process…
But LlamaIndex rightly states that workflows are ephemeral by nature…
There is usually a trigger event for an workflow to be executed, but once the workflow has run, the state of the workflow is lost…
So each run of the workflow starts from scratch and there is no option to reference history or previous context as a whole or on a node basis.
But what if we can save context for each node or step within a workflow?
The context is available to any step in the workflow and context can be stored or loaded.
LlamaIndex recommends avoiding unnecessary overhead by not automatically taking snapshots of a workflow’s current state, meaning it cannot inherently recover from fatal errors on its own.
That said, any step can integrate with an external database like Redis to manually snapshot the context at critical code sections.
For instance, in a long-running workflow handling hundreds of documents, you could persist the ID of the most recently processed document in the state store.
This approach strongly reminds of research from OpenAI where they focussed on Treating RAG Chunks as Time-Aware Micro-Memory Stores.
Below a practical code example…
!pip install llama-index-workflowsfrom workflows import Workflow, step
from workflows.events import StartEvent, StopEvent
class MyWorkflow(Workflow):
def __init__(self, *args, **kwargs):
self.counter = 0
super().__init__(*args, **kwargs)
@step
def count(self, ev: StartEvent) -> StopEvent:
self.counter += 1
return StopEvent(result=f"The step ran {self.counter} times")
w = MyWorkflow()
for _ in range(3):
print(await w.run())The output:
The step ran 1 times
The step ran 2 times
The step ran 3 timesfrom workflows import Workflow, step, Context
from workflows.events import StartEvent, StopEvent
class MyWorkflow(Workflow):
@step
async def count(self, ctx: Context, ev: StartEvent) -> StopEvent:
async with ctx.store.edit_state() as state:
counter = state.get("counter", 1)
retval = StopEvent(result=f"The step ran {counter} times")
state["counter"] = counter + 1
return retval
w = MyWorkflow()
handler = w.run()
print(await handler)
w = MyWorkflow()
handler = w.run(ctx=handler.ctx)
print(await handler)The output:
The step ran 1 times
The step ran 2 timesimport sqlite3
import json
from typing import Annotated
from workflows import Workflow, step, Context
from workflows.events import StartEvent, StopEvent
from workflows.resource import Resource
from workflows.context import JsonSerializer
def get_db() -> sqlite3.Connection:
return sqlite3.connect("mydb.db")
class MyWorkflow(Workflow):
@step
async def count(
self,
ctx: Context,
ev: StartEvent,
db: Annotated[sqlite3.Connection, Resource(get_db)],
) -> StopEvent:
async with ctx.store.edit_state() as state:
counter = state.get("counter", 1)
retval = StopEvent(result=f"The step ran {counter} times")
state["counter"] = counter + 1
cursor = db.cursor()
ctx_dict = ctx.to_dict(serializer=JsonSerializer())
cursor.execute(
"INSERT OR REPLACE INTO state VALUES (?, ?)",
("last_ctx", json.dumps(ctx_dict)),
)
db.commit()
return retval
# Create a simple key-value table
db = get_db()
db.cursor().execute(
"CREATE TABLE IF NOT EXISTS state (key TEXT PRIMARY KEY, value TEXT)"
)
db.commit()
w = MyWorkflow()
print(await w.run())
# State is stored in a DB now, we could restart the process here...
w = MyWorkflow()
cursor = db.cursor()
cursor.execute("SELECT value FROM state WHERE key=?", ("last_ctx",))
ctx_json = cursor.fetchone()[0]
restored_ctx = Context.from_dict(w, json.loads(ctx_json), serializer=JsonSerializer())
print(await w.run(ctx=restored_ctx))The step ran 1 times
The step ran 2 timesChief Evangelist @ Kore.ai | I’m passionate about exploring the intersection of AI and language. Language Models, AI Agents, Agentic Apps, Dev Frameworks & Data-Driven Tools shaping tomorrow.
