Building a Local-First Agent Framework in Rust (Part 14): Recording the Loop
See Part 0 for the latest table of contents and sample code. New chapters will be added over time.
Chapter 14: Recording the Loop: Events as JSONL
Chapter 13 gave each run a place to live. A session now has a creation time, an id, and a directory under .abcb/sessions/<session-id>/.
That solved one half of persistence. We know where a run belongs. But the directory is still mostly empty. If the model gives a bad answer, if a tool runs, if the loop recovers from an error, or if the final answer looks suspicious, there is not yet a durable record of what happened inside that run.
This post is also available on Medium. If you’re a paid Medium member and happen to read it there, it helps fund my next cup of coffee. Much appreciated ☕️😄
This chapter fills that gap. The agent loop now writes an events.jsonl file inside the session directory. It records the important facts of the run:
.abcb/
sessions/
sess-1748613022123/
events.jsonl
The framework idea is simple: the loop should leave a trail as it runs. Not a polished summary. Not an interpretation. Just facts in order.
The Rust idea is also a continuation of what we have been doing. In Chapter 5, write_event accepted &mut impl Write, so event formatting was separated from file location. In this chapter, that same writer is threaded through run_loop and run_step. The loop does not know whether events are going to a real file, a test buffer, or nowhere at all. It only knows that it can write.
The sample code for this chapter is in chapter14/abcb/.
14.1 Events Become a Run Log
We already introduced Event in Chapter 5, but the agent loop was not using it yet. chat --log could write a small three-line log for a one-turn chat, but run was the more interesting command. It is the command that can call tools, recover from model mistakes, and repeat until it gets a final answer.
For that run path, the event enum gains one new variant:
File: abcb/crates/abcb-core/src/lib.rs
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Event {
UserMessage { content: String },
ModelResponse { content: String },
ToolResult { tool_name: String, output: String },
FinalAnswer { content: String },
}
ToolResult matters because a tool call is not just a message. It is a concrete thing that happened outside the model. The model asked for a tool. The framework looked it up. The tool returned output. That output was then fed back into the session as a Role::Tool message.
The event log should preserve that boundary. If we only recorded model responses and final answers, a reader of the log would have to infer whether a tool actually ran. With ToolResult, the log says it plainly:
{"at":1780000000000,"kind":"tool_result","tool_name":"stub_echo","output":"pong"}
In this chapter, we are not trying to make the prettiest possible log. We are trying to make a durable event stream that a later command can replay, summarize, or inspect.
14.2 A Timestamped Wrapper
There is one more change to the log shape. In Chapter 5, each line was just an Event. Now each line is a LoggedEvent:
File: abcb/crates/abcb-core/src/lib.rs
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct LoggedEvent {
#[serde(default)]
pub at: u64,
#[serde(flatten)]
pub event: Event,
}
Event remains the fact the loop wants to record. It says what happened, but it does not say when it was written. The loop can construct this value without reading the clock:
Event::ModelResponse {
content: reply.content.clone(),
}
LoggedEvent is the record as it appears in the file. It wraps that event and adds the timestamp:
{"at":1780000000000,"kind":"model_response","content":"..."}
This is the same split we used for sessions in Chapter 13. Session::new(id, created_at) was pure. Session::start() was the small place where the program read the clock. Here, Event is pure. write_event is the small place where the event receives a timestamp.
The interesting serde attribute is:
#[serde(flatten)]
pub event: Event,
Without flatten, the JSON would look nested:
{"at":1780000000000,"event":{"kind":"model_response","content":"..."}}
With flatten, serde merges the fields of event into the outer object:
{"at":1780000000000,"kind":"model_response","content":"..."}
That shape is easier to scan in a JSONL file. Each line starts with a timestamp and a kind, then the fields for that specific event.
The other attribute is:
#[serde(default)]
pub at: u64,
This lets older event lines still parse even if they do not have an at field. The default value for u64 is 0, so a pre-timestamp log line becomes a LoggedEvent with at == 0.
That is a small example of schema evolution. We changed the file shape, but we did not force every old log to be migrated before the reader can parse it.
14.3 Writing One Event
write_event now wraps the event before serializing it:
File: abcb/crates/abcb-core/src/lib.rs
pub fn write_event(writer: &mut impl Write, event: &Event) -> Result<(), EventLogError> {
let record = LoggedEvent {
at: now_millis(),
event: event.clone(),
};
serde_json::to_writer(&mut *writer, &record)?;
writer.write_all(b"\n")?;
Ok(())
}
The function still accepts:
writer: &mut impl Write
That means the caller lends write_event something writable. In production it can be a BufWriter<File>. In tests it can be a Vec<u8>, because Rust implements Write for byte vectors. If a caller wants to ignore event output, it can pass a mutable sink at the call site:
&mut io::sink()
Rust:io::sink()io::sink()returns a writer that accepts bytes and throws them away. It is useful in tests that need to call a function requiring&mut impl Write, but do not care about the output. It is similar in spirit to writing to/dev/null, but it is just a Rust value.
The event itself is passed by reference:
event: &Event
But LoggedEvent needs to own its event field, so the function clones it:
event: event.clone()
That copy is acceptable here. Events are small, and this keeps the call sites simple. In the current implementation, the caller builds an event, passes a reference to write_event, and the function clones it into the timestamped record.
The timestamp is stamped here:
at: now_millis()
This uses the same private now_millis helper that Session::start() used in Chapter 13. This is the impure part. It reads the clock. By putting it at the write boundary, the loop can still build ordinary Event values without depending on time.
One line in this function deserves a small pause:
serde_json::to_writer(&mut *writer, &record)?;
writer is already a mutable reference, so &mut *writer can look unnecessary. The reason is that serde_json::to_writer takes its writer argument by value. If we passed writer directly, we would move that mutable reference into to_writer, and then the next line could not use writer to write the newline:
writer.write_all(b"\n")?;
&mut *writer creates a fresh temporary mutable borrow of the underlying writer for this one call. That pattern is called reborrowing. We have already been using the same idea whenever a mutable value is lent briefly and then used again afterward; this line only makes the move visible. The temporary borrow is handed to to_writer, and the original writer binding remains usable for write_all.
The first test reflects that:
File: abcb/crates/abcb-core/src/lib.rs
#[test]
fn write_event_writes_one_timestamped_newline_terminated_line() {
let mut buf: Vec<u8> = Vec::new();
let event = Event::ModelResponse {
content: "ok".into(),
};
write_event(&mut buf, &event).expect("write_event should succeed");
let text = std::str::from_utf8(&buf).expect("utf8");
assert!(text.ends_with('\n'));
assert_eq!(text.lines().count(), 1);
let restored = read_events(buf.as_slice()).expect("read back");
assert_eq!(restored.len(), 1);
assert_eq!(restored[0].event, event);
assert!(restored[0].at > 0, "write_event should stamp a real time");
}
The test does not assert the exact JSON string anymore. It cannot, because the timestamp changes on every run. Instead, it asserts the stable parts:
assert!(text.ends_with('\n'));
assert_eq!(text.lines().count(), 1);
assert_eq!(restored[0].event, event);
assert!(restored[0].at > 0);
This is the same testing habit from Chapter 13: when a value is nondeterministic, test the invariant instead of pretending the value is fixed.
14.4 Reading Timestamped Events
read_events now returns Vec<LoggedEvent>:
File: abcb/crates/abcb-core/src/lib.rs
pub fn read_events(reader: impl BufRead) -> Result<Vec<LoggedEvent>, EventLogError> {
let mut events = Vec::new();
for line in reader.lines() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let event: LoggedEvent = serde_json::from_str(trimmed)?;
events.push(event);
}
Ok(events)
}
The function is still strict about malformed non-empty lines. If a line contains invalid JSON, it returns an error. That choice comes from the role of the event log. This is an audit trail. If the trail is corrupted, we should know.
But the function is tolerant about missing timestamps because of #[serde(default)]:
File: abcb/crates/abcb-core/src/lib.rs
#[test]
fn read_events_parses_jsonl_bytes_into_events() {
let input: &[u8] = b"{\"kind\":\"user_message\",\"content\":\"hi\"}\n\
{\"kind\":\"final_answer\",\"content\":\"done\"}\n";
let events = read_events(input).expect("read_events should parse");
assert_eq!(events.len(), 2);
assert_eq!(events[0].at, 0);
assert_eq!(
events[0].event,
Event::UserMessage {
content: "hi".into()
}
);
}
This test is deliberately using old-style lines without at. The parser accepts them and gives at the default value 0.
That does not mean 0 is a real timestamp. It is a compatibility marker. A real event written by the current write_event path should have a positive timestamp. A missing timestamp from an older log becomes 0, and later code can treat that as "unknown" if it cares.
14.5 The Loop Receives an Event Sink
The signature of run_step changes first:
File: abcb/crates/abcb-core/src/lib.rs
pub async fn run_step(
provider: &mut impl Provider,
registry: &ToolRegistry,
session: &mut Session,
events: &mut impl Write,
) -> Result<StepOutcome, LoopError> {
This is the same pattern as the session:
session: &mut Session
events: &mut impl Write
The caller owns the session. The caller also owns the event destination. The loop borrows both while it works.
That new writer introduces one new error path. Writing can fail, so LoopError gets an event-log variant:
File: abcb/crates/abcb-core/src/lib.rs
pub enum LoopError {
Provider(ProviderError),
Parse(ModelOutputError),
UnknownTool(String),
Tool(ToolError),
EventLog(EventLogError),
MaxStepsExceeded { max_steps: usize },
}
And, as with the other wrapped errors, it gets a From implementation:
File: abcb/crates/abcb-core/src/lib.rs
impl From<EventLogError> for LoopError {
fn from(e: EventLogError) -> Self {
LoopError::EventLog(e)
}
}
That is what lets run_step use ? after write_event:
write_event(
events,
&Event::ModelResponse {
content: reply.content.clone(),
},
)?;
An event-log failure is not recoverable by the model:
File: abcb/crates/abcb-core/src/lib.rs
LoopError::EventLog(_) => None,
That fits the recovery rule from Chapter 9. If the model chose an unknown tool or passed bad arguments, the model may fix that on the next turn. If the event log cannot be written, the model cannot repair the filesystem. The loop should stop.
14.6 Record Before Parsing
The most important emission point is the first one inside run_step:
File: abcb/crates/abcb-core/src/lib.rs
let reply = provider.complete(session).await?;
write_event(
events,
&Event::ModelResponse {
content: reply.content.clone(),
},
)?;
session.push_message(Message::new(Role::Assistant, reply.content.clone()));
let output = ModelOutput::parse(&reply.content)?;
The order matters.
The first line asks the provider for a reply:
let reply = provider.complete(session).await?;
Immediately after that, before ModelOutput::parse runs, the raw reply content is written as a ModelResponse event:
write_event(
events,
&Event::ModelResponse {
content: reply.content.clone(),
},
)?;
Only then do we parse the reply:
let output = ModelOutput::parse(&reply.content)?;
That means even a bad model response becomes visible in the log. If a local model returns fenced JSON, prose, or a wrong shape, ModelOutput::parse may fail, but the raw response has already been recorded.
This is exactly the kind of failure that local-first work needs to preserve. If a run fails because the model did not follow the contract, the log should contain the model's actual words, not only the parser's complaint.
This is also why we do not write a parsed verdict into the event. We could imagine adding a field like:
parsed: true
or:
parse_status: "failed"
But that would mix a durable fact with an interpretation. The durable fact is the raw model response. Whether it parses depends on the parser version and the contract we choose. Chapter 18 will make the parsing path more robust. If we store only the raw fact now, future code can reinterpret old logs with better rules.
That is the broader design decision: store facts, derive interpretations.
14.7 Recording Tool Results and Final Answers
When the model asks for a tool, run_step records the tool output after invocation:
File: abcb/crates/abcb-core/src/lib.rs
let tool = registry
.get(&tool_name)
.ok_or_else(|| LoopError::UnknownTool(tool_name.clone()))?;
let output = tool.invoke(&arguments)?;
write_event(
events,
&Event::ToolResult {
tool_name: tool_name.clone(),
output: output.clone(),
},
)?;
session.push_message(Message::new(Role::Tool, output.clone()));
Ok(StepOutcome::ToolExecuted { tool_name, output })
Again, the event log records what happened. The tool returned a string. That string went back into the session as a Role::Tool message. The log records the tool name and the output.
The final answer is written one layer higher, in run_loop:
File: abcb/crates/abcb-core/src/lib.rs
match run_step(provider, registry, session, events).await {
Ok(StepOutcome::Final(answer)) => {
write_event(
events,
&Event::FinalAnswer {
content: answer.clone(),
},
)?;
return Ok(answer);
}
Ok(StepOutcome::ToolExecuted { .. }) => {}
Err(e) => match e.recovery_feedback() {
Some(feedback) => session.push_message(Message::new(Role::Tool, feedback)),
None => return Err(e),
},
}
run_step returns the final answer as an outcome. run_loop decides that this outcome terminates the run. So run_loop is the right place to record FinalAnswer.
This keeps the meaning of the events aligned with the layers:
run_step records per-turn facts: model response, tool result.
run_loop records run-level facts: starting user message, final answer.
14.8 Recording the Starting User Message
The user message is already in the session before run_loop starts. Chapter 13 moved session creation outward, so the CLI now does this:
File: abcb/crates/abcb-cli/src/main.rs
let mut session = Session::start();
session.push_message(Message::new(Role::User, message.as_str()));
Then the loop scans the seeded session for user messages:
File: abcb/crates/abcb-core/src/lib.rs
for message in &session.messages {
if message.role == Role::User {
write_event(
events,
&Event::UserMessage {
content: message.content.clone(),
},
)?;
}
}
This may look a little indirect. Why not pass the user message into run_loop separately?
Because the session is already the source of conversation state. The caller is responsible for seeding it. The loop then records the user messages it sees there. This keeps the run's starting point tied to the session value that the provider will actually receive.
There is a tradeoff. If a session has multiple user messages before run_loop starts, all of them are recorded as starting user events. That is acceptable for now because our CLI seeds one user message. Later, if we support resuming a session with a long history, we may need a more explicit boundary between "historical messages" and "messages for this run."
For now, this is small and honest: record the user messages that were present when the loop began.
Putting the pieces together, the full run_loop now looks like this:
File: abcb/crates/abcb-core/src/lib.rs
pub async fn run_loop(
provider: &mut impl Provider,
registry: &ToolRegistry,
session: &mut Session,
max_steps: usize,
events: &mut impl Write,
) -> Result<String, LoopError> {
for message in &session.messages {
if message.role == Role::User {
write_event(
events,
&Event::UserMessage {
content: message.content.clone(),
},
)?;
}
}
for _ in 0..max_steps {
match run_step(provider, registry, session, events).await {
Ok(StepOutcome::Final(answer)) => {
write_event(
events,
&Event::FinalAnswer {
content: answer.clone(),
},
)?;
return Ok(answer);
}
Ok(StepOutcome::ToolExecuted { .. }) => {}
Err(e) => match e.recovery_feedback() {
Some(feedback) => session.push_message(Message::new(Role::Tool, feedback)),
None => return Err(e),
},
}
}
Err(LoopError::MaxStepsExceeded { max_steps })
}
The placement is important. The initial scan happens once, before the step loop. run_step then records each model response and tool result. When a step returns Final, run_loop records the extracted final answer and returns it.
There is one limitation worth naming. If run_step returns a recoverable error, run_loop still appends recovery feedback to the session:
Some(feedback) => session.push_message(Message::new(Role::Tool, feedback)),
But that recovery feedback is not recorded as its own event yet. The bad model response may already be in the log, because we record model responses before parsing. The follow-up feedback message we send back to the model is only in the in-memory session.
For this snapshot, that is acceptable. The chapter's goal is to record the main loop facts: user input, raw model output, tool results, and final answer. But it also points to a future improvement. If the event log is supposed to explain a whole run after the process exits, recovery feedback should probably become visible there too. The useful thing about adding an event log is that these omissions become easier to see.
14.9 Testing the Whole Event Stream
The full-loop test writes into a Vec<u8>:
File: abcb/crates/abcb-core/src/lib.rs
#[tokio::test]
async fn run_loop_writes_the_full_event_stream_as_jsonl() {
let mut provider = MockProvider::new([
r#"{"kind":"tool_call","tool_name":"stub_echo","arguments":{"text":"pong"}}"#,
r#"{"kind":"final","content":"done"}"#,
]);
let registry = registry_with_stub_echo();
let mut log: Vec<u8> = Vec::new();
run_loop(
&mut provider,
®istry,
&mut session_with_user("hi"),
5,
&mut log,
)
.await
.expect("loop should succeed");
The test does not need a real file. Vec<u8> implements Write, so the loop can write JSONL bytes into memory.
After the loop finishes, the test reads those bytes back:
File: abcb/crates/abcb-core/src/lib.rs
let events: Vec<Event> = read_events(log.as_slice())
.expect("log should parse")
.into_iter()
.map(|logged| logged.event)
.collect();
The timestamps are not part of this assertion. They are real clock values, so the test strips the LoggedEvent wrapper and compares only the event sequence.
The expected sequence is the important part:
File: abcb/crates/abcb-core/src/lib.rs
assert_eq!(
events,
vec![
Event::UserMessage {
content: "hi".into()
},
Event::ModelResponse {
content:
r#"{"kind":"tool_call","tool_name":"stub_echo","arguments":{"text":"pong"}}"#
.into()
},
Event::ToolResult {
tool_name: "stub_echo".into(),
output: "pong".into()
},
Event::ModelResponse {
content: r#"{"kind":"final","content":"done"}"#.into()
},
Event::FinalAnswer {
content: "done".into()
},
]
);
This test is the whole chapter in miniature.
The user starts the run. The model asks for a tool. The tool returns output. The model gives a final answer. The loop records each fact in order.
14.10 Wiring the File in the CLI
The CLI is still responsible for paths and files. The new helper opens the event log inside the session directory:
File: abcb/crates/abcb-cli/src/main.rs
fn open_event_log(session_dir: &Path) -> io::Result<BufWriter<File>> {
let file = OpenOptions::new()
.append(true)
.create(true)
.open(session_dir.join("events.jsonl"))?;
Ok(BufWriter::new(file))
}
This helper belongs in the CLI because it decides where bytes live:
session_dir.join("events.jsonl")
The core loop only receives a writer. It does not know about .abcb, session directories, or file names.
The run command opens the log after it creates the session directory:
File: abcb/crates/abcb-cli/src/main.rs
let dir = create_session_dir(config.memory_dir(), &session.id)?;
let mut events = open_event_log(&dir)?;
let mut provider = build_provider(&config)?;
let answer = run_loop(
&mut provider,
®istry,
&mut session,
config.max_steps(),
&mut events,
)
.await?;
events.flush()?;
The explicit flush is not strictly the only way bytes can reach disk, because dropping a BufWriter also tries to flush. But explicit flushing makes the write boundary visible. The run is done. The answer has been returned. Before printing and exiting, the CLI asks the buffered writer to push its remaining bytes out.
The mock path does the same thing, only with the default memory root and default max-steps value:
File: abcb/crates/abcb-cli/src/main.rs
let dir = create_session_dir(Path::new(DEFAULT_MEMORY_DIR), &session.id)?;
let mut events = open_event_log(&dir)?;
let mut provider = MockProvider::new([format!(
r#"{{"kind":"final","content":"mock run: you said {message}"}}"#
)]);
let answer = run_loop(
&mut provider,
®istry,
&mut session,
DEFAULT_MAX_STEPS,
&mut events,
)
.await?;
events.flush()?;
The provider choice does not decide whether a run is recorded. Both mock and real runs get a session directory and an event log.
14.11 Updating Replay
Because read_events now returns LoggedEvent, replay needs to look through the wrapper:
File: abcb/crates/abcb-cli/src/main.rs
for (index, logged) in events.iter().enumerate() {
let (kind, content) = match &logged.event {
Event::UserMessage { content } => ("user_message", content.clone()),
Event::ModelResponse { content } => ("model_response", content.clone()),
Event::ToolResult { tool_name, output } => {
("tool_result", format!("{tool_name}: {output}"))
}
Event::FinalAnswer { content } => ("final_answer", content.clone()),
};
println!("[{}] {kind}: {content}", index + 1);
}
There are two small points here.
First, replay ignores at for now. That is not because timestamps are useless. It is because this command is still a simple event printer. Later, a summary command can use timestamps if it needs duration or ordering metadata.
Second, ToolResult does not have one content field. It has a tool name and output. Replay formats those into one string:
format!("{tool_name}: {output}")
That is a display decision, not a storage decision. The stored event keeps the fields separate.
14.12 Why Not an Observer Callback?
There was another possible design for this chapter:
pub async fn run_loop(
provider: &mut impl Provider,
registry: &ToolRegistry,
session: &mut Session,
max_steps: usize,
mut on_event: impl FnMut(&Event),
) -> Result<String, LoopError> {
The mut appears before the argument name, not in the type:
mut on_event: impl FnMut(&Event)
That means the local binding named on_event is mutable inside run_loop. The type is still impl FnMut(&Event). We need the binding itself to be mutable because calling an FnMut closure may change the closure's captured state.
Instead of passing a writer, the loop could call a callback whenever something happened:
on_event(&Event::ModelResponse {
content: reply.content.clone(),
});
That would be a more general observer. A caller could write JSONL, update a UI, send events over a socket, collect metrics, or ignore them.
It is a tempting abstraction, but it is too early for this project.
Right now, the format is fixed: JSONL. The destination is fixed: events.jsonl under the session directory. The existing primitive already knows how to write one event as JSONL:
write_event(&mut writer, &event)?;
A callback also complicates the error story. If the observer writes to disk, it can fail. If it sends an event over a socket, it can fail in a different way. If it updates an in-memory UI, it may not fail at all. The loop would need to decide what kind of error an observer is allowed to return and whether that error should abort the run.
With &mut impl Write, the answer is clear. This chapter is about recording JSONL. Writing JSONL can fail with EventLogError. That error becomes LoopError::EventLog, and the model cannot recover from it.
So the current choice is narrower, and that is the point. We are not building a general event bus yet. We are making the loop durable.
Rust:Fn,FnMut, andFnOnce
Closures in Rust implement one or more closure traits depending on how they use captured values.Fnis for closures that can be called through an immutable reference. They do not need to mutate captured state.FnMutis for closures that may mutate captured state. A callback that pushes events into aVec<Event>would needFnMut, because each call changes the vector.FnOnceis for closures that can be called at least once, but may consume captured values when called. If a closure moves a capturedStringout of itself, it cannot be called again.
An event observer for a loop would usually beFnMut, because the observer often accumulates state: append to a buffer, update a counter, or send into a mutable sink. We are not using that design yet, but this is the trait family behind it.
The callback design may return later. If abcb becomes a daemon that streams live events to Godot, or if multiple observers need to watch the same run, a callback or observer trait may earn its place. For now, one writer is enough.
14.13 What Changed
Chapter 14 makes a full agent run observable. run now creates an events.jsonl file inside the session directory, and the core loop writes events into it as the run proceeds.
The event stream records the starting user message, raw model responses, tool results, and the final answer. The raw model response is written before parsing, so malformed model output is still preserved for debugging.
The Rust lesson is that &mut impl Write scales farther than it first looked. In Chapter 5 it let us test write_event with a memory buffer. In this chapter it becomes an argument to run_step and run_loop, so the whole loop can be tested without touching the filesystem.
We also added LoggedEvent, a timestamped wrapper around Event. Event stays pure and deterministic. write_event stamps time at the I/O edge. #[serde(flatten)] keeps the JSONL shape flat, and #[serde(default)] lets older untimestamped lines still parse.
The next chapter uses this persistence foundation in a different direction. The run now has an audit trail. Next, the agent starts to get memory that can survive beyond one run.
To be continued