Skip to content

feat(streaming): proof of concept for streaming agentservice execution#549

Closed
douglas-reid wants to merge 1 commit into
mainfrom
doug/streaming-updates-part-one
Closed

feat(streaming): proof of concept for streaming agentservice execution#549
douglas-reid wants to merge 1 commit into
mainfrom
doug/streaming-updates-part-one

Conversation

@douglas-reid
Copy link
Copy Markdown
Contributor

This PR presents a series of changes that should support a way to stream response information back to a client via an AgentService.

In order to achieve the streaming result, a new method on the AgentService is exposed: async_prompt. This new method returns a new StreamingResponse that has two fields: task and file. These fields provide access to (a) the async task that will be streaming results and (b) a file (here the ChatHistory file) to which all status messages and assistant interactions will be saved.

This PR relies on a full deployment of steamship-core/gpt4#10 to the target environment for testing / validation.

@douglas-reid douglas-reid requested review from dkolas and eob September 12, 2023 04:57
This PR presents a series of changes that should support a way to
stream response information back to a client via an AgentService.

In order to achieve the streaming result, a new method on the AgentService
is exposed: `async_prompt`. This new method returns a new `StreamingResponse`
that has two fields: `task` and `file`. These fields provide access to (a)
the async task that will be streaming results and (b) a file (here the
`ChatHistory` file) to which all status messages and assistant interactions
will be saved.

This PR relies on a full deployment of steamship-core/gpt4#10
to the target environment for testing / validation.
@douglas-reid douglas-reid force-pushed the doug/streaming-updates-part-one branch from 5936e1f to 5101924 Compare September 12, 2023 04:58
@post("async_prompt")
def async_prompt(
self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs
) -> StreamingResponse:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just use InvocableResponse(status=task, data=ctx.chat_history.file) instead?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data = the file makes sense to me, rather than having a new type. See below, I'm not sure I yet understand the task bit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also possible the caller might want a more direct reference to the new blocks that are being streamed as a result of this request?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That feels like a really clean return type to me

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkolas we don't have any way to cascade pre-allocated blocks within a File back -- and I'm not entirely sure that we can (given the current Agent + AgentService architecture). For instance, the status message blocks, by their very nature, cannot be pre-allocated. Any scheme of pre-allocation would then result in out-of-order messaging, etc.

I think File may be the "best" we can do easily. For things that don't require an Agent and Action and status messages, that may not be the case.

Let's talk this through in person.

scope = history_msgs[len(history_msgs) - limit :]
for block in scope:
if is_user_message(block) or is_assistant_message(block):
if is_user_history_message(block):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably don't need this check anymore.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on /why/ someone is calling get_messages?

E.g. if I just want the chat transcript to prepare a chat prompt, I'd still want these kinds of things filtered out, but if I want things like status messages or function calls, then I'd want it.

Copy link
Copy Markdown
Contributor

@eob eob left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor comments, but this looks awesome to me.

  • It's amazing that streaming has been done in a way where you can't actually see any explicit place where we say we're streaming. I thinks that's a sign that this design will "Just Work" without streaming be a special case folks tiptoe around 🚀🚀🚀
  • +1 to a few of your "huh.. do we need to change this?" comments within.


def _function_calls_since_last_user_message(self, context: AgentContext) -> List[Block]:
function_calls = []
for block in context.chat_history.messages[::-1]: # is this too inefficient at scale?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the line of code I've been wondering about too.

You think if we put some kind of optimized query into the engine/database that returns either the right subset of history or on-the-fly tags demarcate certain things like this, it could be a way to do it that was both faster and also let the python half be simpler?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(fwiw I think it's fine to pursue the slower-but-simple solution now and then we'll optimize it after proving to ourselves it works)

value={TagValueKey.STRING_VALUE: action.tool},
),
]
# TODO(dougreid): I'm not convinced this is correct for tools that return multiple values.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly different thought, but the N -> N thing has proven to be a real bear to wrestle with. I find myself writing output[0].text all over the place, which doesn't smell right at the application-code level.


return generate_task.output.blocks

def _from_same_file(self, blocks: List[Block]) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A comment here about strategy would be super useful.

Above, it seems like this is being used like this:

if the blocks we're working with are all from the same file:
  keep it going
else:
  use a temp file

scope = history_msgs[len(history_msgs) - limit :]
for block in scope:
if is_user_message(block) or is_assistant_message(block):
if is_user_history_message(block):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on /why/ someone is calling get_messages?

E.g. if I just want the chat transcript to prepare a chat prompt, I'd still want these kinds of things filtered out, but if I want things like status messages or function calls, then I'd want it.

@post("async_prompt")
def async_prompt(
self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs
) -> StreamingResponse:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That feels like a really clean return type to me

# if no context ID is provided, we need to make sure that the streaming context ID
# is the same one as the non-streaming.
if not ctx_id:
ctx_file = context.chat_history.file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could push this into AgentContext as a get_default_id method

# if you can't find a consistent context_id, then there is no way to provide an accurate
# streaming endpoint.
if not ctx_id:
# TODO(dougreid): this points to a slight flaw in the context_keys vs. context_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gotten the same feeling in use too, since it almost always is the case that just a single ID is used rather than a dict to distinguish.

But the concept of a multi-valued identifier still feels like it makes sense.. maybe the right thing to do is see if we can all put heads together and come up with something that would ONLY work with a full dict as its identifier rather than a string?


original_len = len(file.blocks)

task.wait()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we can still wait on the task is 💯 .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, though, this is only waiting on the invoke_later callback within async_prompt, which doesn't wait on the blocks to be streamed. Am I correct @douglas-reid ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkolas it is complicated. The existing LLM chat() call waits on the full generate() at the moment. This may change when we add streaming=True, but I think we'll want to make that have a way to wait on all the returned blocks to finish streaming (at least, something in the Agent must, as the next action must be selected).

So, at the moment, that task is waiting on everything. And, it feels like it will always do that. To stream, the client would ignore it and watch the File events (which I thought was the way we wanted it).

Copy link
Copy Markdown
Contributor

@dkolas dkolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good. We may want a sync discussion of how the client might expect to consume the stream to make sure we're sending the things that make it as easy as possible.

Comment on lines +331 to +333
task = self.invoke_later(
"/prompt", arguments={"prompt": prompt, "context_id": ctx_id, **kwargs}
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive me if I'm being dense, but what do we gain out of making this invoke_later?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use invoke_later to schedule the work and allow an "early" return with the bits required to monitor that work.

We call /prompt but have passed back the file that all the updates (status, and otherwise) will be posted to, but also have a handle on the Task that is doing the actual work.

I thought this was the approach discussed earlier. Are you imagining something different?

@post("async_prompt")
def async_prompt(
self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs
) -> StreamingResponse:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data = the file makes sense to me, rather than having a new type. See below, I'm not sure I yet understand the task bit.

Comment on lines +124 to +129
generate_task = self.generator.generate(
input_file_id=file_id,
input_file_block_index_list=block_ids,
options=options,
append_output_to_file=True,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to generate with streaming blocks, we also need to pass streaming=True here. That will result in the generate_task.output.blocks being allocated but not yet completed. This means that the caller then has to be aware of whether or not they're streaming.

See: #548

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Walking through this together is probably the best way to answer the confusion here. I think we want streaming=True to always be the case really, but I don't think the caller here cares, because there is a wait() on this task anyway. It is more that the downstream consumer that has access to the File cares, right?

@post("async_prompt")
def async_prompt(
self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs
) -> StreamingResponse:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also possible the caller might want a more direct reference to the new blocks that are being streamed as a result of this request?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants