Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect to Github issues #232

Open
camwest opened this issue Apr 27, 2024 · 3 comments
Open

Connect to Github issues #232

camwest opened this issue Apr 27, 2024 · 3 comments

Comments

@camwest
Copy link

camwest commented Apr 27, 2024

Hey folks,

I added one of my files to github sync and it correctly picked up the primary machine "app". I added some context definitions to stately studio and tried to sync it back but the github diff didn't look appropriate.

CleanShot 2024-04-27 at 14 38 21@2x

I expected it to update the schemas object inside setup with some context definition.

It could be the way my file is structured. See below for the full file.

Thank you!

import { QueryContext } from "@/actions/types";
import { getGodClient } from "@/utils/supabase-god";
import { SupabaseClient } from "@supabase/supabase-js";
import { DataMessage } from "ai";
import { Database, encodeSqid, fetchInterviewIdTupleByFileId } from "database";
import OpenAI from "openai";
import { AssistantStream } from "openai/lib/AssistantStream";
import { assign, createActor, createMachine, fromPromise, setup } from "xstate";
import { MessageAnnotation } from "./AssistantResponse2";
import { fetchContext } from "./context";

type Message = OpenAI.Beta.Threads.ThreadCreateAndRunParams.Thread.Message;
type Run = OpenAI.Beta.Threads.Run;

// Helper function to chunk an array
function chunkArray<T>(array: T[], chunkSize: number): T[][] {
  let index = 0;
  let arrayLength = array.length;
  let tempArray: T[][] = [];

  for (index = 0; index < arrayLength; index += chunkSize) {
    let chunk: T[] = array.slice(index, index + chunkSize);
    tempArray.push(chunk);
  }

  return tempArray;
}

const OPENAI_CHAT_ASSISTANT_ID = process.env.OPENAI_CHAT_ASSISTANT_ID;

interface AIStreamOutput {
  forwardStream: (stream: AssistantStream) => Promise<Run | undefined>;
  sendDataMessage: (message: DataMessage) => void;
  onAnnotation: (
    fn: (
      index: number,
      annotations: MessageAnnotation,
    ) => Promise<{ newText: string }>,
  ) => void;
}

interface InputProps {
  context: QueryContext;
  contextId: number;
  organizationId: number;
  orgname: string;
  message: string;
}

export function createLLMActor({
  adapters,
  input,
  snapshot,
}: {
  adapters: {
    supabase: SupabaseClient<Database>;
    openai: OpenAI;
  };
  input?: InputProps;
  snapshot?: any;
}) {
  const handleInput = fromPromise<
    void,
    {
      orgname: string;
      threadId: string;
      message: string;
      output: AIStreamOutput;
    }
  >(async ({ input }) => {
    console.log("Handling input");

    const output = input.output;

    if (!OPENAI_CHAT_ASSISTANT_ID) {
      throw new Error("no OPENAI_CHAT_ASSISTANT_ID");
    }

    const runStream = adapters.openai.beta.threads.runs.stream(input.threadId, {
      assistant_id: OPENAI_CHAT_ASSISTANT_ID,
    });

    runStream.on("toolCallCreated", async (toolCall) => {
      output.sendDataMessage({
        role: "data",
        data: {
          type: "toolCall",
          toolCall: toolCall.type,
        },
      });
    });

    output.onAnnotation(async (index, annotation: MessageAnnotation) => {
      // TODO: add some caching
      const dataFromDb = await fetchInterviewIdTupleByFileId(
        // use god client since RLS doesn't allow access to openai tables
        getGodClient(),
        annotation.file_id,
      );

      if (!dataFromDb) {
        throw new Error("expected projectId and interviewId");
      }

      const [projectId, interviewId] = dataFromDb;

      const link = `/orgs/${input.orgname}/interviews/${encodeSqid([projectId, interviewId])}`;

      return {
        // disabled for now since indexes are unreliable: ?start_index=${annotation.start_index}&end_index=${annotation.end_index}
        newText: `[${index}](${link})`,
      };
    });

    console.log("Calling forwardStream with runStream:", runStream);
    // forward run status would stream message deltas
    let runResult = await output.forwardStream(runStream);
    console.log("forwardStream returned:", runResult);

    while (
      runResult?.status === "requires_action" &&
      runResult.required_action?.type === "submit_tool_outputs"
    ) {
      // WE DON'T HAVE ANY TOOLS YET
      const tool_outputs =
        runResult.required_action.submit_tool_outputs.tool_calls.map(
          (toolCall: any) => {
            console.log("toolcall", toolCall);

            let parameters;
            console.log(
              "Attempting to parse JSON: ",
              toolCall.function.arguments,
            );

            try {
              parameters = JSON.parse(toolCall.function.arguments);
            } catch (error) {
              console.error("Error parsing JSON:", error);
              // handle error as appropriate for your application
            }

            switch (toolCall.function.name) {
              default:
                throw new Error(
                  `Unknown tool call function: ${toolCall.function.name}`,
                );
            }
          },
        );

      console.log(
        "Calling submitToolOutputsStream with threadId, runResult.id, and tool_outputs:",
        input.threadId,
        runResult.id,
        tool_outputs,
      );

      runResult = await output.forwardStream(
        adapters.openai.beta.threads.runs.submitToolOutputsStream(
          input.threadId,
          runResult.id,
          {
            tool_outputs,
          },
        ),
      );
      console.log("submitToolOutputsStream returned:", runResult);
    }
  });

  const createThread = fromPromise<
    string,
    {
      context: QueryContext;
      contextId: number;
      organizationId: number;
      message: string;
    }
  >(async ({ input }) => {
    const dataContext = await fetchContext(
      adapters.supabase,
      input.context,
      input.contextId,
    );

    // check the content length and split into multiple messages if needed
    // ensure this value has at most 32768 characters
    const maxContentLength = 32768;

    const content = JSON.stringify(dataContext.context);

    const messages: Message[] = [];

    if (content.length > maxContentLength) {
      const contentChunks: string[] = [];
      const regex = new RegExp(".{1," + maxContentLength + "}", "g");
      let match;
      while ((match = regex.exec(content)) !== null) {
        contentChunks.push(match[0]);
      }
      if (!contentChunks) {
        throw new Error("no content chunks");
      }

      for (const chunk of contentChunks) {
        messages.push({
          role: "user",
          content: chunk,
          metadata: {
            type: "context",
            label: dataContext.label,
            link: dataContext.link,
          },
        });
      }
    } else {
      messages.push({
        role: "user",
        content,
        metadata: {
          type: "context",
          label: dataContext.label,
          link: dataContext.link,
        },
      });
    }

    // Chunk the contextFiles array into chunks of 10
    const fileChunks = chunkArray(dataContext.files, 10);

    // Create a new message for each chunk
    const chunkMessages: Message[] = fileChunks.map((chunk) => {
      return {
        role: "user",
        content: "here are the transcripts",
        metadata: {
          type: "context",
          label: "Transcripts",
        },
        attachments: chunk.map((fileId) => {
          return {
            file_id: fileId,
            tools: [{ type: "file_search" }],
          };
        }),
      };
    });

    console.log("Creating thread with openai");
    const resp = await adapters.openai.beta.threads.create({
      messages: [...messages, ...chunkMessages],
    });

    const threadId = resp.id;

    return threadId;
  });

  const machine = setup({
    types: {
      context: {} as {
        context: QueryContext;
        contextId: number;
        threadId: string | null;
        organizationId: number;
        orgname: string;
        message: string;
      },
      input: {} as InputProps,
      events: {} as
        | { type: "Close" }
        | { type: "New Report" }
        | { type: "Edit Report" }
        | { type: "Show Report" }
        | {
            type: "Handle Input";
            message: string;
            output: AIStreamOutput;
          }
        | { type: "Save Started" }
        | { type: "Save Completed" }
        | { type: "Load Completed" },
    },
    actors: {
      handleInput: handleInput,
      createThread: createThread,

      draftNewReport: createMachine({
        /* ... */
      }),
      editExistingReport: createMachine({
        /* ... */
      }),
    },
    schemas: {
      events: {
        Close: {
          type: "object",
          properties: {},
        },
        "New Report": {
          type: "object",
          properties: {},
        },
        "Edit Report": {
          type: "object",
          properties: {},
        },
        "Show Report": {
          type: "object",
          properties: {},
        },
        "Handle Input": {
          type: "object",
          properties: {},
        },
        "Save Started": {
          type: "object",
          properties: {},
        },
        "Save Completed": {
          type: "object",
          properties: {},
        },
        "Load Completed": {
          type: "object",
          properties: {},
        },
      },
    },
  }).createMachine({
    context: ({ input }) => ({
      context: input.context,
      contextId: input.contextId,
      organizationId: input.organizationId,
      orgname: input.orgname,
      message: input.message,
      threadId: null,
    }),
    id: "app",
    type: "parallel",
    states: {
      Chat: {
        initial: "Creating Thread",
        states: {
          "Creating Thread": {
            invoke: {
              id: "createThread",
              src: "createThread",
              input: ({ context }) => ({
                context: context.context,
                contextId: context.contextId,
                organizationId: context.organizationId,
                message: context.message,
              }),

              onDone: {
                target: "Waiting For Input",
                actions: assign({ threadId: ({ event }) => event.output }),
              },
            },
          },

          "Waiting For Input": {
            on: {
              "Handle Input": {
                target: "#app.Chat.Processing.Processing Output",
              },
            },
          },

          Processing: {
            initial: "Processing Output",
            states: {
              "Processing Output": {
                on: {
                  "New Report": {
                    target: "#app.Chat.Processing.Report Tool.Draft New Report",
                  },
                  "Edit Report": {
                    target:
                      "#app.Chat.Processing.Report Tool.Edit Existing Report",
                  },
                  "Show Report": {
                    target: "#app.Chat.Waiting For Input",
                  },
                },
                invoke: {
                  id: "handleInput",
                  src: "handleInput",
                  input: ({ context, event }) => {
                    if (context.threadId === null) {
                      throw new Error("threadId is null");
                    }

                    if (event.type === "Handle Input") {
                      return {
                        threadId: context.threadId,
                        message: event.message,
                        output: event.output,
                        orgname: context.orgname,
                      };
                    } else {
                      throw new Error("Invalid event type");
                    }
                  },
                  onDone: {
                    target: "#app.Chat.Waiting For Input",
                  },
                },
              },
              "Report Tool": {
                initial: "Draft New Report",
                states: {
                  "Draft New Report": {
                    invoke: {
                      id: "reportGenerator",
                      input: {},
                      onDone: {
                        target: "#app.Chat.Waiting For Input",
                      },
                      src: "draftNewReport",
                    },
                  },
                  "Edit Existing Report": {
                    invoke: {
                      id: "reportGenerator",
                      input: {},
                      onDone: {
                        target: "#app.Chat.Waiting For Input",
                      },
                      src: "editExistingReport",
                    },
                  },
                },
              },
            },
          },
        },
      },
      "Tool Window": {
        initial: "No Tool",
        on: {
          "New Report": {
            target: "#app.Tool Window.Report Tool.Viewing Unsaved Report",
          },
          "Show Report": {
            target: "#app.Tool Window.Report Tool.Loading Version",
          },
          "Edit Report": {
            target: "#app.Tool Window.Report Tool.Viewing Unsaved Report",
          },
        },
        states: {
          "No Tool": {},
          "Report Tool": {
            initial: "Viewing Unsaved Report",
            on: {
              Close: {
                target: "No Tool",
              },
            },
            states: {
              "Viewing Unsaved Report": {
                on: {
                  "Save Started": {
                    target: "Saving New Version",
                  },
                },
              },
              "Saving New Version": {
                on: {
                  "Save Completed": {
                    target: "Viewing Saved Report",
                  },
                },
              },
              "Viewing Saved Report": {},
              "Loading Version": {
                on: {
                  "Load Completed": {
                    target: "Viewing Saved Report",
                  },
                },
              },
            },
          },
        },
      },
    },
  });

  if (snapshot) {
    return createActor(machine, {
      input,
      snapshot,
    });
  } else {
    return createActor(machine, {
      input,
    });
  }
}
@camwest
Copy link
Author

camwest commented Apr 27, 2024

I'm also curious why the inline assign was changed.

@davidkpiano
Copy link
Member

I'll investigate this; thanks for the detailed report.

@camwest
Copy link
Author

camwest commented Apr 27, 2024

I appreciate it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants