This is the Python SDK to build real-time continuously running unstructured data processing pipelines with Indexify.
Start by writing and testing your pipelines locally using your data, then deploy them into the Indexify service to process data in real-time at scale.
pip install indexify- Extracts text, tables and images from an ingested PDF file
- Indexes the text using MiniLM-L6-v2, the images with CLIP
- Writes the results into a vector database.
Youtube Transcription Summarizer
- Downloads Youtube Video
- Extracts audio from the video and transcribes using
Faster Whisper - Uses Llama 3.1 backed by
Llama.cppto understand and classify the nature of the video. - Routes the transcription dynamically to one of the transcription summarizer to retain specific summarization attributes.
- Finally the entire transcription is embedded and stored in a vector database for retrieval.
- Write data processing functions in Python and use Pydantic objects for returning complex data types from functions
- Connect functions using a graph interface. Indexify automatically stores function outputs and passes them along to downstream functions.
- If a function returns a list, the downstream functions will be called with each item in the list in parallel.
- The input of the first function becomes the input to the HTTP endpoint of the Graph.
- There is NO limit to volume of data being ingested since we use blob stores for storing metadata and objects
- The server can handle 10s of 1000s of files being ingested into the graphs in parallel.
- The scheduler reacts under 8 microseconds to ingestion events, so it's suitable for workflows which needs to run in realtime.
- Batch ingestion is handled gracefully by batching ingested data and scheduling for high throughput in production settings.
from pydantic import BaseModel
from indexify import indexify_function
from typing import Dict, Any, Optional, List
# Define function inputs and outputs
class Document(BaseModel):
text: str
metadata: Dict[str, Any]
class TextChunk(BaseModel):
text: str
metadata: Dict[str, Any]
embedding: Optional[List[float]] = None
# Decorate a function which is going to be part of your data processing graph
@indexify_function()
def split_text(doc: Document) -> List[TextChunk]:
midpoint = len(doc.text) // 2
first_half = TextChunk(text=doc.text[:midpoint], metadata=doc.metadata)
second_half = TextChunk(text=doc.text[midpoint:], metadata=doc.metadata)
return [first_half, second_half]
# Any requirements specified is automatically installed in production clusters
@indexify_function(requirements=["langchain_text_splitter"])
def compute_embedding(chunk: TextChunk) -> TextChunk:
chunk.embedding = [0.1, 0.2, 0.3]
return chunk
# You can constrain functions to run on specific executors
@indexify_function(executor_runtime_name="postgres-driver-image")
def write_to_db(chunk: TextChunk):
# Write to your favorite vector database
...
## Create a graph
from indexify import Graph
g = Graph(name="my_graph", start_node=split_text)
g.add_edge(split_text, compute_embedding)
g.add_edge(embed_text, write_to_db)Every time the Graph is invoked, Indexify will provide an Invocation Id which can be used to know about the status of the processing and any outputs from the Graph.
from indexify import IndexifyClient
client = IndexifyClient(local=True)
client.register_graph(g)
invocation_id = client.invoke_graph_with_object(g.name, Document(text="Hello, world!", metadata={"source": "test"}))
graph_outputs = client.graph_outputs(g.name, invocation_id)Work In Progress - The version of server that works with python based graphs haven't been released yet. It will be shortly released. Join discord for development updates.
from indexify import IndexifyClient
client = IndexifyClient(service_url="http://localhost:8900")
client.register_graph(g)Extraction Graphs continuously run on the Indexify Service like any other web service. Indexify Server runs the extraction graphs in parallel and in real-time when new data is ingested into the service.
output_id = client.invoke_graph_with_object(g.name, Document(text="Hello, world!", metadata={"source": "test"}))graph_outputs = client.graph_outputs(g.name, output_id)graph_inputs = client.graph_inputs(g.name)