Examples

Real-world examples showing how to instrument common ML pipeline patterns with ZenRay.

Use Cases

1 RAG Document Retrieval Retrieval + LLM

A typical RAG pipeline that retrieves documents, filters by relevance, and generates an answer. X-Ray tracks which documents made it to the LLM context.

What to debug

  • Which documents were retrieved but filtered out
  • Relevance scores and drop reasons
  • Prompts sent to the LLM and responses

Code

import xray

xray.init(api_key="your-api-key")

@xray.pipeline("rag-retrieval")
def answer_question(question: str):
    xray.tag("question_length", len(question))
    
    docs = retrieve_docs(question)
    relevant = filter_relevance(docs, question)
    answer = generate_answer(question, relevant)
    
    return answer

@xray.step("RETRIEVE")
def retrieve_docs(question: str):
    embedding = embed(question)
    xray.metric("embedding_model", "text-embedding-3-small")
    
    results = vector_db.search(embedding, limit=100)
    return results

@xray.step("FILTER")
def filter_relevance(docs, question):
    for doc in docs:
        score = cross_encoder.score(question, doc.text)
        xray.score(doc, score)
        
        if score < 0.3:
            xray.drop(doc, "low_relevance")
            continue
        if doc.is_outdated:
            xray.drop(doc, "outdated_content")
            continue
        yield doc

@xray.step("LLM_CALL")
def generate_answer(question: str, docs: list):
    context = "\n".join([d.text for d in docs])
    prompt = f"Context:\n{context}\n\nQuestion: {question}"
    
    xray.artifact("prompt", prompt)
    xray.metric("context_docs", len(docs))
    
    response = openai.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    
    answer = response.choices[0].message.content
    xray.artifact("response", answer)
    
    return answer
2 E-commerce Product Search Multi-stage ranking

A multi-stage search pipeline with retrieval, ranking, filtering, and personalization. Track why products don't appear in results.

What to debug

  • Products filtered due to stock or geo restrictions
  • Ranking scores and how they affect final order
  • User personalization impact

Code

import xray

@xray.pipeline("product-search")
def search_products(query: str, user_id: str):
    xray.tag("user_id", user_id)
    xray.tag("query", query)
    
    candidates = retrieve(query)
    scored = rank(candidates, query)
    filtered = apply_filters(scored)
    personalized = personalize(filtered, user_id)
    
    return personalized[:10]

@xray.step("RETRIEVE")
def retrieve(query: str):
    xray.metric("index", "products-v2")
    results = elasticsearch.search(query, limit=1000)
    return results

@xray.step("RANK")
def rank(items, query):
    for item in items:
        relevance = ranking_model.predict(item, query)
        xray.score(item, relevance)
        item.score = relevance
    return sorted(items, key=lambda x: x.score, reverse=True)

@xray.step("FILTER")
def apply_filters(items):
    for item in items:
        if not item.in_stock:
            xray.drop(item, "out_of_stock")
            continue
        if item.is_restricted:
            xray.drop(item, "geo_restricted")
            continue
        if item.score < 0.1:
            xray.drop(item, "below_threshold")
            continue
        yield item
3 AI Agent Tool Selection Tool routing

An agent that selects and executes tools. X-Ray tracks which tools were considered and why some were rejected.

What to debug

  • Tool relevance scores for each request
  • Why certain tools were not selected
  • Tool inputs, outputs, and execution times

Code

import xray

@xray.pipeline("customer-support-agent")
def handle_request(user_message: str):
    xray.tag("message_type", classify_intent(user_message))
    
    # Select appropriate tools
    tools = select_tools(user_message)
    
    # Execute tools until we have an answer
    for tool in tools:
        result = execute_tool(tool, user_message)
        if result.is_complete:
            return format_response(result)
    
    return fallback_response()

@xray.step("SELECT")
def select_tools(message: str):
    all_tools = get_available_tools()
    
    for tool in all_tools:
        relevance = tool.compute_relevance(message)
        xray.score(tool, relevance)
        
        if relevance < 0.3:
            xray.drop(tool, "low_relevance")
            continue
        if not tool.is_available():
            xray.drop(tool, "service_unavailable")
            continue
        yield tool

@xray.step("TOOL_CALL")
def execute_tool(tool, message: str):
    xray.metric("tool_name", tool.name)
    xray.artifact("tool_input", {"message": message})
    
    result = tool.execute(message)
    
    xray.artifact("tool_output", result.to_dict())
    xray.metric("execution_time_ms", result.duration_ms)
    
    return result
4 Content Moderation Safety filters

A moderation pipeline that checks content against multiple filters. Track why content is flagged or allowed.

What to debug

  • Which blocklist words triggered flags
  • Toxicity scores from the model
  • False positives and negatives

Code

import xray

@xray.pipeline("content-moderation")
def moderate_content(content: str, content_type: str):
    xray.tag("content_type", content_type)
    xray.tag("content_length", len(content))
    
    # Run through all moderation checks
    if check_blocklist(content):
        return {"allowed": False, "reason": "blocklist_match"}
    
    toxicity = check_toxicity(content)
    if toxicity > 0.8:
        return {"allowed": False, "reason": "high_toxicity"}
    
    return {"allowed": True}

@xray.step("FILTER")
def check_blocklist(content: str):
    content_lower = content.lower()
    
    for word in BLOCKLIST:
        if word in content_lower:
            xray.drop({"content": content[:100]}, f"blocklist:{word}")
            return True
    return False

@xray.step("LLM_CALL")
def check_toxicity(content: str):
    xray.artifact("input", content)
    
    result = toxicity_model.predict(content)
    
    xray.metric("toxicity_score", result.score)
    xray.metric("categories", result.flagged_categories)
    
    return result.score