GPT-like multimodal chatbot

+

https://github.com/dagworks-inc/burr by DAGWorks Inc. (YCW23 & StartX).

Take🏠:

  • high level what is Burr

  • what you can do with Burr (observing state & being able to debug a particular point in time)

  • watch a walkthrough of this notebook here.

Agentic Problems

  1. Why did this LLM call fail?

  2. Oh crap my code broke, why?

  3. Things went off the rails, but where?

  4. etc

Monitoring for the win, right?

Well but … monitoring doesn’t help you debug & complete your dev loop

  1. How do I debug that quickly?

  2. How do I fix the inputs/code, and restart my agent?

  3. What if my agent was 20+ steps in … do I have to restart from step 0? or can I go to a specific point in time?

Solution: Burr

(Complements our other framework Hamilton)

1. Agent application is modeled as State + Actions –> Graph

Straightforward multi-modal example below:

import copy

from IPython.display import Image, display
from IPython.core.display import HTML 
import openai

from burr.core import ApplicationBuilder, State, default, graph, when
from burr.core.action import action
from burr.tracking import LocalTrackingClient

MODES = {
    "answer_question": "text",
    "generate_image": "image",
    "generate_code": "code",
    "unknown": "text",
}


@action(reads=[], writes=["chat_history", "prompt"])
def process_prompt(state: State, prompt: str) -> State:
    result = {"chat_item": {"role": "user", "content": prompt, "type": "text"}}
    state = state.append(chat_history=result["chat_item"])
    state = state.update(prompt=prompt)
    return state


@action(reads=["prompt"], writes=["mode"])
def choose_mode(state: State) -> State:
    prompt = (
        f"You are a chatbot. You've been prompted this: {state['prompt']}. "
        f"You have the capability of responding in the following modes: {', '.join(MODES)}. "
        "Please respond with *only* a single word representing the mode that most accurately "
        "corresponds to the prompt. Fr instance, if the prompt is 'draw a picture of a cat', "
        "the mode would be 'generate_image'. If the prompt is "
        "'what is the capital of France', the mode would be 'answer_question'."
        "If none of these modes apply, please respond with 'unknown'."
    )

    llm_result = openai.Client().chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are a helpful assistant"},
            {"role": "user", "content": prompt},
        ],
    )
    content = llm_result.choices[0].message.content
    mode = content.lower()
    if mode not in MODES:
        mode = "unknown"
    result = {"mode": mode}
    return state.update(**result)


@action(reads=["prompt", "chat_history"], writes=["response"])
def prompt_for_more(state: State) -> State:
    result = {
        "response": {
            "content": "None of the response modes I support apply to your question. "
                       "Please clarify?",
            "type": "text",
            "role": "assistant",
        }
    }
    return state.update(**result)


@action(reads=["prompt", "chat_history", "mode"], writes=["response"])
def chat_response(
        state: State, prepend_prompt: str, model: str = "gpt-3.5-turbo"
) -> State:
    
    chat_history = copy.deepcopy(state["chat_history"])
    chat_history[-1]["content"] = f"{prepend_prompt}: {chat_history[-1]['content']}"
    chat_history_api_format = [
        {
            "role": chat["role"],
            "content": chat["content"],
        }
        for chat in chat_history
    ]
    client = openai.Client()
    result = client.chat.completions.create(
        model=model,
        messages=chat_history_api_format,
    )
    text_response = result.choices[0].message.content
    result = {"response": {"content": text_response, "type": MODES[state["mode"]], "role": "assistant"}}
    return state.update(**result)


@action(reads=["prompt", "chat_history", "mode"], writes=["response"])
def image_response(state: State, model: str = "dall-e-2") -> State:
    """Generates an image response to the prompt. Optional save function to save the image to a URL."""
    # raise ValueError("Demo error")
    client = openai.Client()
    result = client.images.generate(
        model=model, prompt=state["prompt"], size="1024x1024", quality="standard", n=1
    )
    image_url = result.data[0].url
    result = {"response": {"content": image_url, "type": MODES[state["mode"]], "role": "assistant"}}
    return state.update(**result)


@action(reads=["response", "mode"], writes=["chat_history"])
def response(state: State) -> State:
    # you'd do something specific here based on prior state
    result = {"chat_item": state["response"]}
    return state.append(chat_history=result["chat_item"])
# Built the graph.
base_graph = (
    graph.GraphBuilder()
    .with_actions(
        # these are the "nodes" 
        prompt=process_prompt,
        decide_mode=choose_mode,
        generate_image=image_response,
        generate_code=chat_response.bind(
            prepend_prompt="Please respond with *only* code and no other text (at all) to the following:",
        ),
        answer_question=chat_response.bind(
            prepend_prompt="Please answer the following question:",
        ),
        prompt_for_more=prompt_for_more,
        response=response,
    )
    .with_transitions(
        # these are the edges between nodes, based on state.
        ("prompt", "decide_mode", default),
        ("decide_mode", "generate_image", when(mode="generate_image")),
        ("decide_mode", "generate_code", when(mode="generate_code")),
        ("decide_mode", "answer_question", when(mode="answer_question")),
        ("decide_mode", "prompt_for_more", default),
        (
            ["generate_image", "answer_question", "generate_code", "prompt_for_more"],
            "response",
        ),
        ("response", "prompt", default),
    )
    .build()
)
base_graph.visualize()
../../../_images/7baf196d9ea191cc9f552705148b505c95bc44bd8d20762b4408ce2e806f5ff1.svg

2. Build application –> built in checkpointing & tracking

tracker = LocalTrackingClient(project="agent-demo")
app = (
    ApplicationBuilder()
    .with_graph(base_graph)
    .initialize_from(
        tracker, 
        resume_at_next_action=True, 
        default_state={"chat_history": []},
        default_entrypoint="prompt",
    )
    .with_tracker(tracker)  # tracking + checkpointing; one line 🪄.
    .build()
)
app
../../../_images/961ccf2d21bb45ddc5a735b657550ebf380b508173891513db90f397233853c1.svg

3. Comes with a UI

View runs in the UI; Let’s run the app first.

while True:
    user_input = input("Hi, how can I help?")
    if "quit" == user_input.lower():
        break
    last_action, action_result, app_state = app.run(
        halt_after=["response"], 
        inputs={"prompt": user_input}
    )
    last_message = app_state["chat_history"][-1]
    if last_message['type'] == 'image':
        display(Image(url=last_message["content"]))
    else:
        print(f"🤖: {last_message['content']}")
Hi, how can I help? what is the capital of France?
🤖: The capital of France is Paris.
Hi, how can I help? write hello world in java
🤖: ```
public class HelloWorld {
    public static void main(String[] args) {
        System.out.println("Hello, World!");
    }
}
```
Hi, how can I help? draw a pen
********************************************************************************
-------------------------------------------------------------------
Oh no an error! Need help with Burr?
Join our discord and ask for help! https://discord.gg/4FxBMyzW5n
-------------------------------------------------------------------
> Action: `generate_image` encountered an error!<
> State (at time of action):
{'__PRIOR_STEP': 'decide_mode',
 '__SEQUENCE_ID': 10,
 'chat_history': "[{'role': 'user', 'content': 'what is the capital ...",
 'mode': 'generate_image',
 'prompt': 'draw a pen',
 'response': "{'content': '```\\npublic class HelloWorld {\\n    p..."}
> Inputs (at time of action):
{'prompt': 'draw a pen'}
********************************************************************************
Traceback (most recent call last):
  File "/Users/stefankrawczyk/dagworks/burr/burr/core/application.py", line 534, in _step
    result, new_state = _run_single_step_action(
  File "/Users/stefankrawczyk/dagworks/burr/burr/core/application.py", line 233, in _run_single_step_action
    action.run_and_update(state, **inputs), action.name
  File "/Users/stefankrawczyk/dagworks/burr/burr/core/action.py", line 533, in run_and_update
    return self._fn(state, **self._bound_params, **run_kwargs)
  File "/var/folders/gv/q39lb_1s26x7gbyyypqc3dkm0000gn/T/ipykernel_43564/1354917547.py", line 94, in image_response
    raise ValueError("Demo error")
ValueError: Demo error
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[12], line 5
      3 if "quit" == user_input.lower():
      4     break
----> 5 last_action, action_result, app_state = app.run(
      6     halt_after=["response"], 
      7     inputs={"prompt": user_input}
      8 )
      9 last_message = app_state["chat_history"][-1]
     10 if last_message['type'] == 'image':

File ~/dagworks/burr/burr/telemetry.py:273, in capture_function_usage.<locals>.wrapped_fn(*args, **kwargs)
    270 @functools.wraps(call_fn)
    271 def wrapped_fn(*args, **kwargs):
    272     try:
--> 273         return call_fn(*args, **kwargs)
    274     finally:
    275         if is_telemetry_enabled():

File ~/dagworks/burr/burr/core/application.py:878, in Application.run(self, halt_before, halt_after, inputs)
    876 while True:
    877     try:
--> 878         next(gen)
    879     except StopIteration as e:
    880         return e.value

File ~/dagworks/burr/burr/core/application.py:823, in Application.iterate(self, halt_before, halt_after, inputs)
    820 prior_action: Optional[Action] = None
    821 while self.has_next_action():
    822     # self.step will only return None if there is no next action, so we can rely on tuple unpacking
--> 823     prior_action, result, state = self.step(inputs=inputs)
    824     yield prior_action, result, state
    825     if self._should_halt_iterate(halt_before, halt_after, prior_action):

File ~/dagworks/burr/burr/core/application.py:495, in Application.step(self, inputs)
    492 # we need to increment the sequence before we start computing
    493 # that way if we're replaying from state, we don't get stuck
    494 self._increment_sequence_id()
--> 495 out = self._step(inputs=inputs, _run_hooks=True)
    496 return out

File ~/dagworks/burr/burr/core/application.py:548, in Application._step(self, inputs, _run_hooks)
    546     exc = e
    547     logger.exception(_format_BASE_ERROR_MESSAGE(next_action, self._state, inputs))
--> 548     raise e
    549 finally:
    550     if _run_hooks:

File ~/dagworks/burr/burr/core/application.py:534, in Application._step(self, inputs, _run_hooks)
    532 try:
    533     if next_action.single_step:
--> 534         result, new_state = _run_single_step_action(
    535             next_action, self._state, action_inputs
    536         )
    537     else:
    538         result = _run_function(
    539             next_action, self._state, action_inputs, name=next_action.name
    540         )

File ~/dagworks/burr/burr/core/application.py:233, in _run_single_step_action(action, state, inputs)
    230 # TODO -- guard all reads/writes with a subset of the state
    231 action.validate_inputs(inputs)
    232 result, new_state = _adjust_single_step_output(
--> 233     action.run_and_update(state, **inputs), action.name
    234 )
    235 _validate_result(result, action.name)
    236 out = result, _state_update(state, new_state)

File ~/dagworks/burr/burr/core/action.py:533, in FunctionBasedAction.run_and_update(self, state, **run_kwargs)
    532 def run_and_update(self, state: State, **run_kwargs) -> tuple[dict, State]:
--> 533     return self._fn(state, **self._bound_params, **run_kwargs)

Cell In[8], line 94, in image_response(state, model)
     91 @action(reads=["prompt", "chat_history", "mode"], writes=["response"])
     92 def image_response(state: State, model: str = "dall-e-2") -> State:
     93     """Generates an image response to the prompt. Optional save function to save the image to a URL."""
---> 94     raise ValueError("Demo error")
     95     client = openai.Client()
     96     result = client.images.generate(
     97         model=model, prompt=state["prompt"], size="1024x1024", quality="standard", n=1
     98     )

ValueError: Demo error

Open the UI - http://localhost:7241/

But something broke / I want to debug

Use:

  • Application ID

app_id = "a6d74912-9ad6-42f0-9a18-bc17c5e77eaf"
resumed_app = (
    ApplicationBuilder()
    .with_graph(base_graph)
    .initialize_from(
        tracker,
        resume_at_next_action=True, 
        default_state={"chat_history": []},
        default_entrypoint="prompt",
    )
    .with_tracker(tracker)
    .with_identifiers(app_id=app_id)
    .build()
)
resumed_app.state["chat_history"]
[{'role': 'user', 'content': 'what is the capital of France?', 'type': 'text'},
 {'content': 'The capital of France is Paris.',
  'type': 'text',
  'role': 'assistant'},
 {'role': 'user', 'content': 'write hello world in java', 'type': 'text'},
 {'content': '```\npublic class HelloWorld {\n    public static void main(String[] args) {\n        System.out.println("Hello, World!");\n    }\n}\n```',
  'type': 'code',
  'role': 'assistant'},
 {'role': 'user', 'content': 'draw a pen', 'type': 'text'}]
while True:
    user_input = input("Hi, how can I help?")
    if "quit" == user_input.lower():
        break
    last_action, action_result, app_state = resumed_app.run(
        halt_after=["response"], 
        inputs={"prompt": user_input}
    )
    last_message = app_state["chat_history"][-1]
    if last_message['type'] == 'image':
        display(Image(url=last_message["content"]))
    else:
        print(f"🤖: {last_message['content']}")
Hi, how can I help? 
Hi, how can I help? what is the capital of England?
🤖: The capital of England is London.
Hi, how can I help? quit

Actually what if I want to go back to a certain point in time?

  • Fork: Start with state from any checkpoint

app_id = "a6d74912-9ad6-42f0-9a18-bc17c5e77eaf"
sequence_id = 4
# partition_key = ""
forked_app = (
    ApplicationBuilder()
    .with_graph(base_graph) # this could be different...
    .initialize_from(
        tracker,
        resume_at_next_action=True, 
        default_state={"chat_history": []},
        default_entrypoint="prompt",
        fork_from_app_id=app_id,
        fork_from_sequence_id=sequence_id,
        # fork_from_partition_key=partition_key
    )
    .with_tracker(tracker)
    .build()
)
# show prior forked state
forked_app.state["chat_history"]
[{'role': 'user', 'content': 'what is the capital of France?', 'type': 'text'},
 {'content': 'The capital of France is Paris.',
  'type': 'text',
  'role': 'assistant'},
 {'role': 'user', 'content': 'write hello world in java', 'type': 'text'}]
while True:
    user_input = input("Hi, how can I help?")
    if "quit" == user_input.lower():
        break
    last_action, action_result, app_state = forked_app.run(
        halt_after=["response"], 
        inputs={"prompt": user_input}
    )
    last_message = app_state["chat_history"][-1]
    if last_message['type'] == 'image':
        display(Image(url=last_message["content"]))
    else:
        print(f"🤖: {last_message['content']}")
Hi, how can I help? 
🤖: ```java
public class HelloWorld {
    public static void main(String[] args) {
        System.out.println("Hello, World!");
    }
}
```

Want to know more?

Link to video walking through this notebook.

https://github.com/dagworks-inc/burr

Time Travel blog post & video:

More blogs @ blog.dagworks.io e.g. async & streaming

More examples:

Follow on Twitter & LinkedIn:

  • https://x.com/burr_framework

  • https://x.com/dagworks

  • https://x.com/stefkrawczyk

  • https://www.linkedin.com/in/skrawczyk/