import { serve } from "@upstash/workflow/nextjs"
import {
downloadData,
aggregateResults,
generateReport,
sendReport,
getDatasetUrl,
splitIntoChunks,
} from "./utils"
type OpenAiResponse = {
choices: {
message: {
role: string,
content: string
}
}[]
}
export const { POST } = serve<{ datasetId: string; userId: string }>(
async (context) => {
const request = context.requestPayload
// Step 1: Download the dataset
const datasetUrl = await context.run("get-dataset-url", async () => {
return await getDatasetUrl(request.datasetId)
})
// HTTP request with much longer timeout (2hrs)
const { body: dataset } = await context.call("download-dataset", {
url: datasetUrl,
method: "GET"
})
// Step 2: Process data in chunks using OpenAI
const chunkSize = 1000
const chunks = splitIntoChunks(dataset, chunkSize)
const processedChunks: string[] = []
for (let i = 0; i < chunks.length; i++) {
const { body: processedChunk } = await context.api.openai.call(
`process-chunk-${i}`,
{
token: process.env.OPENAI_API_KEY,
operation: "chat.completions.create",
body: {
model: "gpt-4",
messages: [
{
role: "system",
content:
"You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.",
},
{
role: "user",
content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`,
},
],
max_completion_tokens: 150,
},
}
)
processedChunks.push(processedChunk.choices[0].message.content!)
// Every 10 chunks, we'll aggregate intermediate results
if (i % 10 === 9 || i === chunks.length - 1) {
await context.run(`aggregate-results${i}`, async () => {
await aggregateResults(processedChunks)
processedChunks.length = 0
})
}
}
// Step 3: Generate and send data report
const report = await context.run("generate-report", async () => {
return await generateReport(request.datasetId)
})
await context.run("send-report", async () => {
await sendReport(report, request.userId)
})
}
)