feat(streaming): initial implementation of streaming agentservice execution#554
Conversation
12b0933 to
832eb1f
Compare
| from steamship.invocable import PackageService, post | ||
|
|
||
|
|
||
| class StreamingResponse(BaseModel): |
There was a problem hiding this comment.
@douglas-reid if you extend BaseModel instead of CamelModel then the serialization is underscored rather than the camelcase that we use for engine comms.
It sneaks through in the case since it's internal to an object's response, but it breaks the parsing of the task in the Typescript client.
Would it be possible to add a test that, e.g., response.task.requestId is serialized as requestId instead of request_id just to make sure that the clients demarshalling it won't explode?
832eb1f to
f9c4b17
Compare
925d586 to
0b65f6c
Compare
eob
left a comment
There was a problem hiding this comment.
A few non-blocking thoughts in there, but this looks fantastic. Excited for it to be merged!!
|
|
||
| assert num_blocks > 0, "Blocks should have been streamed during execution" | ||
| assert llm_prompt_event_count == 2, ( | ||
| "At least 2 llm prompts should have happened (first for tool selection, " |
There was a problem hiding this comment.
These error messages are awesome -- thank you for adding them
|
|
||
| # if you can't find a consistent context_id, then something has gone wrong, preventing streaming | ||
| if not ctx_id: | ||
| # TODO(dougreid): this points to a slight flaw in the context_keys vs. context_id |
There was a problem hiding this comment.
Is this the situation where in practice we've just been using context_id whereas in theory we're hashing the whole context_keys object?
There was a problem hiding this comment.
yeah. we have a mismatch in our exposed args in /prompt vs. our actual capability. not sure if that matter practically yet.
| self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs | ||
| ) -> StreamingResponse: | ||
| ctx_id, history_file = self._streaming_context_id_and_file(context_id=context_id, **kwargs) | ||
| task = self.invoke_later( |
There was a problem hiding this comment.
I think in a V2 we'll want to follow up and see if we can have the things that are later-invoked auto-block this task from completing (thinking about the comment string about the semantics of the Task below.. we could maybe try to make conformance to that automatic)
This PR presents a series of changes that should support a way to stream response information back to a client via an AgentService. In order to achieve the streaming result, a new method on the AgentService is exposed: `async_prompt`. This new method returns a new `StreamingResponse` that has two fields: `task` and `file`. These fields provide access to (a) the async task that will be streaming results and (b) a file (here the `ChatHistory` file) to which all status messages and assistant interactions will be saved. This PR relies on a full deployment of steamship-core/gpt4#10 to the target environment for testing / validation.
0421c4c to
a66d6c7
Compare
a66d6c7 to
ad653c6
Compare
This PR establishes an initial implementation of streaming utilities for
AgentServiceendpoints, based onFunctionsBasedAgentruns. This will allow clients to invoke an agent run through an async endpoint and stream, via SSE, block creation events related to the agent's execution. These events can include status messages (from Agents and Tools) as well as generated blocks (from LLMs and Tools).With this code, new async endpoints can be exposed with code like the following:
The PR ensures all
Blocks created in the service of arun_agentcall are tagged with the properrequest-id, and that all calls to the LLM are streaming-compatible.A test is provided that demonstrates an approach to consuming a stream of events, based on the sseclient-py library. A Generator is constructed that emits
Blocks until a terminal block for a request is found in the stream.