Bug on pending promise. Resonate server just completely fail (to stop the `serve` process and to process the promise)
Closed this issue · 1 comments
Expected Behavior
1 - Start worker, start resonate and start server
2 - If there are pending promises the resonate server calls the worker.
3 - The workers calls the resonate server for claiming the task, this alone should just work.
Actual Behavior
1 - Start worker, start resonate and start server
2 - If there are pending promises the resonate server calls the worker.
3 - The workers calls the resonate server for claiming the task.
4 - The fetch request for claiming the task fails and everything gets frozen.
To Reproduce
$ pnpm init
$ pnpm i @resonate/sdk express
$ pnpm install --save-dev @types/express @types/node ts-node typescript
- Create src folder, add both an
index.ts
and aworker.ts
- Contents of index.ts:
import express, { Request, Response } from "express";
import { Resonate } from "@resonatehq/sdk/dist/async";
import { Context } from "@resonatehq/sdk";
/**
* Asynchronously downloads and summarizes the text from a given URL.
*
* @param context - The context object provided by @resonatehq/sdk, used to run other asynchronous operations.
* @param url - The URL of the page to download and summarize.
* @returns A promise that resolves to a summary of the downloaded page's text.
*/
export async function downloadAndSummarize(context: Context, url: string) {
// Summarize the content on a node with a gpu
console.log("url", url);
let summary = await context.run(`/gpu/summarize/summarize-${url}`, url);
// Return the summary of the content
return summary;
}
// Initialize a Resonate application.
const resonate = new Resonate({ url: "http://localhost:8001" });
// Register a function as a Resonate function
resonate.register(
"downloadAndSummarize",
downloadAndSummarize,
resonate.options({ timeout: Number.MAX_SAFE_INTEGER })
);
// Start the Resonate application
resonate.start();
// Initialize an Express application.
const app = express().use(express.json());
// Register a function as an Express endpoint
app.post("/summarize", async (req: Request, res: Response) => {
const url = req.body?.url;
try {
// Call the resonate function
let summary = await resonate.run(
"downloadAndSummarize",
/* id */ `summarize-${url}`,
/* param */ url
);
res.send(summary);
} catch (e) {
res.status(500).send("An error occurred.");
}
});
// Start the Express application
app.listen(3000, () => {
console.log("Listening on port 3000");
});
- Contents of worker.ts:
import express, { Request, Response } from "express";
const app = express();
app.use(express.json());
app.post("/", (req: Request, res: Response) => {
console.log("Task received", req.body);
try {
process(req.body).catch((error) =>
console.error("Error while processing task", error)
);
res.status(200);
} catch (e) {
console.error("Error", e);
res.status(500);
}
});
async function process(request: any): Promise<void> {
const {
taskId,
counter,
links: { claim: claimUrl, complete: completeUrl },
} = request;
console.log("Claiming task", taskId, counter);
// Claim the task
let response = await fetch(claimUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
taskId,
counter,
processId: "process-id",
executionId: "execution-id",
expiryInMilliseconds: 60,
}),
});
if (!response.ok) {
throw new Error(`Error claiming task: ${response}`);
}
let data = await response.json();
console.log("Task claimed", response);
// BUSINESS LOGIC
let summary = "This is a summary of the text";
// BUSINESS LOGIC
// Complete the task
console.log("Completing task", taskId, counter);
response = await fetch(completeUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
taskId,
counter,
executionId: "execution-id",
state: "resolved",
value: {
headers: { "Content-Type": "application/json" },
data: Buffer.from(JSON.stringify(summary)).toString("base64"),
},
}),
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
data = await response.json();
console.log("Task completed", taskId, counter);
}
const PORT = 5001;
app.listen(PORT, () => {
console.log(`Starting worker on port ${PORT}\n`);
});
- Add this to your
package.json
:
{
"scripts": {
"dev": "ts-node src/index.ts",
"worker": "ts-node src/worker.ts"
},
}
- Run each command on a distinct terminal instance or use something like concurrenty:
$ pnpm run dev
$ pnpm run worker
$ resonate serve
Ctrl ^ C
on thepnpm run dev
terminal instance, fully stop it, let resonate run and the worker run.
Change both lines onindex.ts
:
export async function downloadAndSummarize(context: Context, url: string) {
// Summarize the content on a node with a gpu
console.log("url", url);
- let summary = await context.run(`/gpu/summarize/summarize-${url}`, url);
+ let summary = await context.run(`/gpu/summarize/summarize1-${url}`, url);
// Return the summary of the content
return summary;
}
and
app.post("/summarize", async (req: Request, res: Response) => {
const url = req.body?.url;
try {
// Call the resonate function
let summary = await resonate.run(
"downloadAndSummarize",
- /* id */ `summarize-${url}`,
+ /* id */ `summarize1-${url}`,
/* param */ url
);
res.send(summary);
} catch (e) {
res.status(500).send("An error occurred.");
}
});
-
Rerun
pnpm run dev
. -
Curl the API again with
$ curl \
-X POST \
-H 'Content-Type: application/json' \
-d '{"url": "http://example.com"}' \
http://localhost:3000/summarize
-
Repeat process 6, 7 and 8. Change where it's
summarize1
tosummarize2
, ....summarizeN
. You'll see that eventually it gets fully frozen, it doesn't send to the worker. So it piles up. -
Now that it never sends to the worker, fully stop the resonate server, and resonate worker.
-
Run them again. First the worker, then the server.
-
You'll get the following on the worker console.
Task received {
queue: 'analytics',
taskId: '/gpu/summarize/summarize4-http://example.com',
counter: 763,
links: {
claim: 'http://localhost:8001/tasks/claim',
complete: 'http://localhost:8001/tasks/complete'
}
}
Claiming task /gpu/summarize/summarize4-http://example.com 763
Error while processing task TypeError: fetch failed
at Object.fetch (node:internal/deps/undici/undici:11576:11)
at async process (/Users/nicolasmelo/workspace/resonate-world/src/worker.ts:30:18) {
cause: SocketError: other side closed
at Socket.onSocketEnd (node:internal/deps/undici/undici:9790:26)
at Socket.emit (node:events:526:35)
at Socket.emit (node:domain:489:12)
at endReadableNT (node:internal/streams/readable:1376:12)
at processTicksAndRejections (node:internal/process/task_queues:82:21) {
code: 'UND_ERR_SOCKET',
socket: {
localAddress: '::1',
localPort: 50692,
remoteAddress: '::1',
remotePort: 8001,
remoteFamily: 'IPv6',
timeout: undefined,
bytesWritten: 387,
bytesRead: 0
}
}
}
And on the resonate server you shall receive:
time=2024-04-25T10:43:20.807-03:00 level=INFO msg="connection http::http://localhost:5001 submitted task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.858-03:00 level=INFO msg="connection http::http://localhost:5001 received task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.860-03:00 level=ERROR msg="connection http::http://localhost:5001 failed to complete task with error: \"Post \\\"http://localhost:5001\\\": dial tcp [::1]:5001: connect: connection refused\""
time=2024-04-25T10:43:20.860-03:00 level=INFO msg="connection http::http://localhost:5001 submitted task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.906-03:00 level=INFO msg="connection http::http://localhost:5001 received task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.908-03:00 level=ERROR msg="connection http::http://localhost:5001 failed to complete task with error: \"Post \\\"http://localhost:5001\\\": dial tcp [::1]:5001: connect: connection refused\""
time=2024-04-25T10:43:20.908-03:00 level=INFO msg="connection http::http://localhost:5001 submitted task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:20.951-03:00 level=INFO msg="connection http::http://localhost:5001 received task /gpu/summarize/summarize4-http://example.com"
time=2024-04-25T10:43:21.009-03:00 level=INFO msg="http: panic serving [::1]:50692: unknown status code 4035\ngoroutine 8546 [running]:\nnet/http.(*conn).serve.func1()\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:1868 +0xb0\npanic({0x102c90680?, 0x14000131c90?})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/runtime/panic.go:920 +0x26c\ngithub.com/resonatehq/resonate/internal/kernel/t_api.ResponseStatus.String(0x30?)\n\t/Users/runner/work/resonate/resonate/internal/kernel/t_api/status.go:70 +0x228\ngithub.com/resonatehq/resonate/internal/api.HandleRequestError(0x14000298500?)\n\t/Users/runner/work/resonate/resonate/internal/api/errors.go:97 +0x28\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/service.(*Service).ClaimTask(0x140003561a0, 0x1400079fdd0, 0x1400016e040)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/service/task.go:53 +0x274\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).claimTask(0x140000b84e0, 0x140007ea300)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/task.go:27 +0xac\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).log(0x140003dac60?, 0x140007ea300)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/http.go:95 +0x48\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/gin-gonic/gin.(*Engine).handleHTTPRequest(0x140004eed00, 0x140007ea300)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:620 +0x524\ngithub.com/gin-gonic/gin.(*Engine).ServeHTTP(0x140004eed00, {0x102df6190?, 0x140000ce2a0}, 0x1400013ef00)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:576 +0x1a0\nnet/http.serverHandler.ServeHTTP({0x102df3740?}, {0x102df6190?, 0x140000ce2a0?}, 0x6?)\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2938 +0xbc\nnet/http.(*conn).serve(0x14000174900, {0x102df8040, 0x140006181b0})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2009 +0x518\ncreated by net/http.(*Server).Serve in goroutine 50\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:3086 +0x4cc"
- The resonate server gets fully frozen. Once you try hitting
Ctrl + C
it logs:
time=2024-04-25T10:46:30.856-03:00 level=INFO msg="http: panic serving [::1]:50901: system is shutting down\ngoroutine 98520 [running]:\nnet/http.(*conn).serve.func1()\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:1868 +0xb0\npanic({0x102d1ef60?, 0x140000e6b40?})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/runtime/panic.go:920 +0x26c\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).searchPromises(0x140000b84e0, 0x140007ea000)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/promise.go:65 +0x358\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/resonatehq/resonate/internal/app/subsystems/api/http.(*server).log(0x140003daa00?, 0x140007ea000)\n\t/Users/runner/work/resonate/resonate/internal/app/subsystems/api/http/http.go:95 +0x48\ngithub.com/gin-gonic/gin.(*Context).Next(...)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/context.go:174\ngithub.com/gin-gonic/gin.(*Engine).handleHTTPRequest(0x140004eed00, 0x140007ea000)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:620 +0x524\ngithub.com/gin-gonic/gin.(*Engine).ServeHTTP(0x140004eed00, {0x102df6190?, 0x14000228620}, 0x14000258500)\n\t/Users/runner/go/pkg/mod/github.com/gin-gonic/gin@v1.9.1/gin.go:576 +0x1a0\nnet/http.serverHandler.ServeHTTP({0x140007bc090?}, {0x102df6190?, 0x14000228620?}, 0x6?)\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2938 +0xbc\nnet/http.(*conn).serve(0x14000422000, {0x102df8040, 0x140006181b0})\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:2009 +0x518\ncreated by net/http.(*Server).Serve in goroutine 50\n\t/Users/runner/hostedtoolcache/go/1.21.9/x64/src/net/http/server.go:3086 +0x4cc"
but never really stops the process.
resonate.yml contents
# api:
# baseUrl: http://example.com
aio:
subsystems:
queuing:
config:
connections:
- kind: http
name: summarize
metadata:
properties:
url: http://localhost:5001
routes:
- kind: pattern
name: default
target:
connection: summarize
queue: analytics
metadata:
properties:
pattern: /gpu/summarize/*
Specifications
- Version:
resonate/sdk: ^0.5.0 - Platform:
Chip: M1 Pro
Ram: 16GB
Storage: 512 GB SSD.
Additional context
- Showed this bug live to @dtornow during the interview
- This was me just following the documentation available at quickstart repo. This happened on Step 3.
Thank you so much for the detailed report Nicolas! We dug into this and what we discovered is there are actually two bugs that worked together to make a bad situation worse :D
The first bug (and the quickfix) was a bug in our quickstart code, specifically in the worker.
app.post('/', (req: Request, res: Response) => {
console.log("Task received", req.body);
try {
process(req.body).catch(error => console.error("Error while processing task", error));
res.status(200);
} catch (e) {
console.error("Error", e);
res.status(500);
}
});
The problem is res.status
does not actually complete the http request, we're missing send
. If you change the code to the following, your example should work 🤞
app.post('/', (req: Request, res: Response) => {
console.log("Task received", req.body);
try {
process(req.body).catch(error => console.error("Error while processing task", error));
res.status(200).send();
} catch (e) {
console.error("Error", e);
res.status(500).send();
}
});
The second bug was in our server, we forgot to set a timeout for these http requests. If no timeout is specified golang defaults to waiting for forever. Furthermore, by default our server has only a single worker to handle these http requests (this is configurable) and if that worker is waiting forever, then no subsequent tasks will be sent to workers (please note that other parts of our server were unaffected). This waiting is the reason the server would not shutdown. We will remedy this by adding a timeout, thanks for helping us find this!