import asyncio import os from typing import AsyncGenerator, AsyncIterable, Awaitable, Optional, Tuple from vocode.streaming.models.agent import AgentConfig, AgentType from vocode.streaming.agent.base_agent import BaseAgent, RespondAgent import logging import os from fastapi import FastAPI from vocode.streaming.models.telephony import TwilioConfig from pyngrok import ngrok from vocode.streaming.telephony.config_manager.redis_config_manager import ( RedisConfigManager, ) from vocode.streaming.models.agent import ChatGPTAgentConfig from vocode.streaming.models.message import BaseMessage from vocode.streaming.models.synthesizer import ElevenLabsSynthesizerConfig from vocode.streaming.telephony.server.base import ( TwilioInboundCallConfig, TelephonyServer, ) from vocode.streaming.telephony.server.base import TwilioCallConfig import uvicorn from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import StreamingResponse from langchain.callbacks import AsyncIteratorCallbackHandler from langchain.chat_models import ChatOpenAI from langchain.schema import HumanMessage from pydantic import BaseModel from speller_agent import SpellerAgentFactory import sys # if running from python, this will load the local .env # docker-compose will load the .env file by itself from dotenv import load_dotenv load_dotenv() app = FastAPI() logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) config_manager = RedisConfigManager() BASE_URL = os.getenv("BASE_URL") if not BASE_URL: ngrok_auth = os.environ.get("NGROK_AUTH_TOKEN") if ngrok_auth is not None: ngrok.set_auth_token(ngrok_auth) port = sys.argv[sys.argv.index("--port") + 1] if "--port" in sys.argv else 6789 # Open a ngrok tunnel to the dev server BASE_URL = ngrok.connect(port).public_url.replace("https://", "") logger.info('ngrok tunnel "{}" -> "http://127.0.0.1:{}"'.format(BASE_URL, port)) if not BASE_URL: raise ValueError("BASE_URL must be set in environment if not using pyngrok") from speller_agent import SpellerAgentConfig print(AgentType) telephony_server = TelephonyServer( base_url=BASE_URL, config_manager=config_manager, inbound_call_configs=[ TwilioInboundCallConfig( url="/inbound_call", # agent_config=ChatGPTAgentConfig( # initial_message=BaseMessage(text="What up."), # prompt_preamble="Act as a customer talking to 'Cosmos', a pizza establisment ordering a large pepperoni pizza for pickup. If asked for a name, your name is 'Hunter McRobie', and your credit card number is 4743 2401 5792 0539 CVV: 123 and expiratoin is 10/25. If asked for numbers, say them one by one",#"Have a polite conversation about life while talking like a pirate.", # generate_responses=True, # model_name="gpt-3.5-turbo" # ), agent_config=SpellerAgentConfig(generate_responses=False, initial_message=BaseMessage(text="What up.")), twilio_config=TwilioConfig( account_sid=os.environ["TWILIO_ACCOUNT_SID"], auth_token=os.environ["TWILIO_AUTH_TOKEN"], ), synthesizer_config=ElevenLabsSynthesizerConfig.from_telephone_output_device( api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("YOUR VOICE ID") ) ) ], agent_factory=SpellerAgentFactory(), logger=logger, ) async def send_message(message: str) -> AsyncIterable[str]: callback = AsyncIteratorCallbackHandler() model = ChatOpenAI( streaming=True, verbose=True, callbacks=[callback], ) async def wrap_done(fn: Awaitable, event: asyncio.Event): """Wrap an awaitable with a event to signal when it's done or an exception is raised.""" try: await fn except Exception as e: # TODO: handle exception print(f"Caught exception: {e}") finally: # Signal the aiter to stop. event.set() # Begin a task that runs in the background. task = asyncio.create_task(wrap_done( model.agenerate(messages=[[HumanMessage(content=message)]]), callback.done), ) async for token in callback.aiter(): # Use server-sent-events to stream the response yield f"data: {token}\n\n" await task class StreamRequest(BaseModel): """Request body for streaming.""" message: str @app.post("/stream") def stream(body: StreamRequest): return StreamingResponse(send_message(body.message), media_type="text/event-stream") app.include_router(telephony_server.get_router())