modelcontextprotocol/java-sdk

feat: Support separate endpoint path router in WebFluxStreamableServerTransportProvider

Opened this issue · 3 comments

Please do a quick search on GitHub issues first, the feature you are about to request might have already been requested.
#79
#80
#425
#432

Expected Behavior

The user can customize the endpoint routing functions.
We hope to wrap HTTP-APIs to MCP-Server-Tools.

  • Uses Spring WebFlux's RouterFunction for endpoint handling (GET, POST, DELETE)

We hope to support the follow MCP-Servers in one application process:

  • /mcp
  • /mcp/mcp-server-app-name-A -> some MCP-Tools
  • /mcp/mcp-server-app-name-B -> some MCP-Tools
  • /mcp/mcp-server-app-name-C -> some MCP-Tools

Current Behavior

The RouterFunction is private initialization in WebFluxStreamableServerTransportProvider, and its constructor is private.

public class WebFluxStreamableServerTransportProvider implements McpStreamableServerTransportProvider {

	private final String mcpEndpoint;

	private final RouterFunction<?> routerFunction;

	private WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint,
			McpTransportContextExtractor<ServerRequest> contextExtractor, boolean disallowDelete,
			Duration keepAliveInterval) {
		
		this.mcpEndpoint = mcpEndpoint;
		
		this.routerFunction = RouterFunctions.route()
			.GET(this.mcpEndpoint, this::handleGet)
			.POST(this.mcpEndpoint, this::handlePost)
			.DELETE(this.mcpEndpoint, this::handleDelete)
			.build();
	}

	public RouterFunction<?> getRouterFunction() {
		return this.routerFunction;
	}

}
public abstract class RouterFunctions {

	public static Builder route() {
		return new RouterFunctionBuilder();
	}

}
class RouterFunctionBuilder implements RouterFunctions.Builder {

	private final List<RouterFunction<ServerResponse>> routerFunctions = new ArrayList<>();


	@Override
	public RouterFunctions.Builder add(RouterFunction<ServerResponse> routerFunction) {
		Assert.notNull(routerFunction, "RouterFunction must not be null");
		this.routerFunctions.add(routerFunction);
		return this;
	}

	@Override
	public RouterFunction<ServerResponse> build() {
		if (this.routerFunctions.isEmpty()) {
			throw new IllegalStateException("No routes registered. Register a route with GET(), POST(), etc.");
		}
		RouterFunction<ServerResponse> result = new BuiltRouterFunction(this.routerFunctions);

		if (this.filterFunctions.isEmpty() && this.errorHandlers.isEmpty()) {
			return result;
		}
		else {
			HandlerFilterFunction<ServerResponse, ServerResponse> filter =
					Stream.concat(this.filterFunctions.stream(), this.errorHandlers.stream())
							.reduce(HandlerFilterFunction::andThen)
							.orElseThrow(IllegalStateException::new);

			return result.filter(filter);
		}
	}


	/**
	 * Router function returned by {@link #build()} that simply iterates over the registered routes.
	 */
	private static class BuiltRouterFunction extends RouterFunctions.AbstractRouterFunction<ServerResponse> {

		private final List<RouterFunction<ServerResponse>> routerFunctions;

		public BuiltRouterFunction(List<RouterFunction<ServerResponse>> routerFunctions) {
			Assert.notEmpty(routerFunctions, "RouterFunctions must not be empty");
			this.routerFunctions = new ArrayList<>(routerFunctions);
		}

		@Override
		public Mono<HandlerFunction<ServerResponse>> route(ServerRequest request) {
			return Flux.fromIterable(this.routerFunctions)
					.concatMap(routerFunction -> routerFunction.route(request))
					.next();
		}

		@Override
		public void accept(RouterFunctions.Visitor visitor) {
			this.routerFunctions.forEach(routerFunction -> routerFunction.accept(visitor));
		}
	}

}

Context

API is MCP, allowing AI to connect to the real world with lower cost, speed, and security. The existing APIs can be instantly converted into a Remote MCP Server, laying out the shortest connection path between AI and the real world.

We need to start multiple WebFluxStreamableServerTransportProvider, McpAsyncServer instances in one application process. Please to see the follow code in McpServerConfiguration, that is reference to McpServerStreamableHttpWebFluxAutoConfiguration.

It can support the follow MCP-Servers:

  • /mcp
  • /mcp/mcp-server-app-name-A -> some MCP-Tools
  • /mcp/mcp-server-app-name-B -> some MCP-Tools

But the RouterFunction can not dynamic update when the database update for some new app-name MCP-Server.

  • /mcp/mcp-server-app-name-C
@Slf4j
@EnableConfigurationProperties({ McpServerStreamableHttpProperties.class })
@Configuration(proxyBeanMethods = false)
public class McpServerConfiguration {

    public McpServerConfiguration() {
        log.info("create McpServerConfiguration");
    }

    @Bean
    public Map<String, List<McpTool>> mcpToolListMap() {
        List<String> yamlFiles = List.of(
                "mcp-server-user-apis.yml",
                "mcp-server-travel-apis.yml"
        );

        return yamlFiles.stream()
                .map(YamlUtil::load)
                .collect(Collectors.toMap(
                        mcpServerRule -> mcpServerRule.getServer().getName(),
                        McpServerRule::getTools
                ));
    }

    @Bean
    @ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    @Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
    public Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap(
            Map<String, List<McpTool>> mcpToolListMap) {
        log.info("init transportProviderMap");

        Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap =
                new ConcurrentHashMap<>(mcpToolListMap.size());
        transportProviderMap.putAll(McpServerTransportManager.transportProviderMap(mcpToolListMap.keySet()));
        return transportProviderMap;
    }

    /**
     * @see McpServerAutoConfiguration#capabilitiesBuilder()
     */
    @Bean
    public McpSchema.ServerCapabilities.Builder capabilitiesBuilder() {
        log.info("init capabilitiesBuilder");

        return McpSchema.ServerCapabilities.builder()
                .tools(true);
    }

    @Bean
    @ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    public Map<String, McpAsyncServer> mcpAsyncServerMap(
            Map<String, List<McpTool>> mcpToolListMap,
            Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap,
            McpSchema.ServerCapabilities.Builder capabilitiesBuilder) {
        log.info("init mcpAsyncServerMap");

        return McpServerManager.mcpAsyncServerMap(mcpToolListMap, transportProviderMap, capabilitiesBuilder);
    }

    /**
     * @see McpServerStreamableHttpWebFluxAutoConfiguration#webFluxStreamableServerTransportProvider
     */
    @Bean
    @ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    @Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
    public WebFluxStreamableServerTransportProvider webFluxStreamableServerTransportProvider(
            ObjectProvider<ObjectMapper> objectMapperProvider, McpServerStreamableHttpProperties serverProperties) {
        log.info("init webFluxStreamableServerTransportProvider");

        ObjectMapper objectMapper = objectMapperProvider.getIfAvailable(ObjectMapper::new);

        return WebFluxStreamableServerTransportProvider.builder()
                .objectMapper(objectMapper)
                .messageEndpoint(serverProperties.getMcpEndpoint())
                .keepAliveInterval(serverProperties.getKeepAliveInterval())
                .disallowDelete(serverProperties.isDisallowDelete())
                .build();
    }

    /**
     * @see McpServerStreamableHttpWebFluxAutoConfiguration#webFluxStreamableServerRouterFunction
     */
    // Router function for streamable http transport used by Spring WebFlux to start an
    // HTTP server.
    @Bean
    @ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    @Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
    public RouterFunction<?> webFluxStreamableServerRouterFunction(
            WebFluxStreamableServerTransportProvider webFluxProvider,
            Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap) {
        log.info("init webFluxStreamableServerRouterFunction");

        RouterFunctions.Builder routerFunctionBuilder = RouterFunctions.route();
        routerFunctionBuilder.add((RouterFunction<ServerResponse>) webFluxProvider.getRouterFunction());

        for (WebFluxStreamableServerTransportProvider transportProvider : transportProviderMap.values()) {
            routerFunctionBuilder.add((RouterFunction<ServerResponse>) transportProvider.getRouterFunction());
        }

        return routerFunctionBuilder.build();
    }

}

I try to the follow method, but not success to dynamic update RouterFunction.

public class McpServerConfiguration {

    @Bean
    public Map<String, List<McpTool>> mcpToolListMap(
            McpServiceToolHttpApiBizService httpApiBizService) {
        return httpApiBizService.loadData();
    }

    @Bean
    @ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    @Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
    public Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap(
            Map<String, List<McpTool>> mcpToolListMap,
            ConfigurableListableBeanFactory beanFactory) {
        log.info("init transportProviderMap");

        Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap =
                new ConcurrentHashMap<>(mcpToolListMap.size());
        transportProviderMap.putAll(McpServerTransportManager.transportProviderMap(mcpToolListMap.keySet()));

        for (Map.Entry<String, WebFluxStreamableServerTransportProvider> entry : transportProviderMap.entrySet()) {
            String serverName = entry.getKey();
            WebFluxStreamableServerTransportProvider transportProvider = entry.getValue();
            // 小写连字符转驼峰
            String beanNamePrefix = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, serverName);
            String routerFunctionBeanName = beanNamePrefix + "RouterFunction";
            beanFactory.registerSingleton(routerFunctionBeanName, transportProvider.getRouterFunction());
        }

        return transportProviderMap;
    }

//    // Router function for streamable http transport used by Spring WebFlux to start an
//    // HTTP server.
//    @Bean
//    @ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
//    @Conditional({McpServerAutoConfiguration.EnabledStreamableServerCondition.class})
////    @SuppressWarnings("unchecked")
//    public RouterFunction<?> webFluxStreamableServerRouterFunction(
//            WebFluxStreamableServerTransportProvider webFluxProvider,
//            Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap,
//            ConfigurableListableBeanFactory beanFactory) {
//        log.info("init webFluxStreamableServerRouterFunction");
//
////        RouterFunctions.Builder routerFunctionBuilder = RouterFunctions.route();
////        routerFunctionBuilder.add((RouterFunction<ServerResponse>) webFluxProvider.getRouterFunction());
////
////        for (WebFluxStreamableServerTransportProvider transportProvider : transportProviderMap.values()) {
////            routerFunctionBuilder.add((RouterFunction<ServerResponse>) transportProvider.getRouterFunction());
////        }
////
////        return routerFunctionBuilder.build();
//
//        for (Map.Entry<String, WebFluxStreamableServerTransportProvider> entry : transportProviderMap.entrySet()) {
//            String serverName = entry.getKey();
//            WebFluxStreamableServerTransportProvider transportProvider = entry.getValue();
//            // 小写连字符转驼峰
//            String beanNamePrefix = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, serverName);
//            String routerFunctionBeanName = beanNamePrefix + "RouterFunction";
//            beanFactory.registerSingleton(routerFunctionBeanName, transportProvider.getRouterFunction());
//        }
//
//        return webFluxProvider.getRouterFunction();
//    }

}

Because RouterFunctionMapping init afterPropertiesSet().

public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {

	@Nullable
	private RouterFunction<?> routerFunction;


	/**
	 * Create an empty {@code RouterFunctionMapping}.
	 * <p>If this constructor is used, this mapping will detect all
	 * {@link RouterFunction} instances available in the application context.
	 */
	public RouterFunctionMapping() {
	}

	/**
	 * Create a {@code RouterFunctionMapping} with the given {@link RouterFunction}.
	 * <p>If this constructor is used, no application context detection will occur.
	 * @param routerFunction the router function to use for mapping
	 */
	public RouterFunctionMapping(RouterFunction<?> routerFunction) {
		this.routerFunction = routerFunction;
	}


	/**
	 * Return the configured {@link RouterFunction}.
	 * <p><strong>Note:</strong> When router functions are detected from the
	 * ApplicationContext, this method may return {@code null} if invoked
	 * prior to {@link #afterPropertiesSet()}.
	 * @return the router function or {@code null}
	 */
	@Nullable
	public RouterFunction<?> getRouterFunction() {
		return this.routerFunction;
	}

	@Override
	public void afterPropertiesSet() throws Exception {

		if (this.routerFunction == null) {
			initRouterFunctions();
		}
		if (this.routerFunction != null) {
			RouterFunctions.changeParser(this.routerFunction, getPathPatternParser());
		}

	}

	/**
	 * Initialized the router functions by detecting them in the application context.
	 */
	protected void initRouterFunctions() {
		List<RouterFunction<?>> routerFunctions = routerFunctions();
		this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
		logRouterFunctions(routerFunctions);
	}

	private List<RouterFunction<?>> routerFunctions() {
		// 返回路由函数列表
		return obtainApplicationContext()
				.getBeanProvider(RouterFunction.class)
				.orderedStream()
				.map(router -> (RouterFunction<?>) router)
				.collect(Collectors.toList());
	}


	@Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		if (this.routerFunction != null) {
			ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
			return this.routerFunction.route(request)
					.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
		}
		else {
			return Mono.empty();
		}
	}

}

Does spring-projects/spring-ai#4645 help?

This PR can implement a custom RouterFunction<?> for user optional.

    /**
     * @see McpServerStreamableHttpWebFluxAutoConfiguration#webFluxStreamableServerRouterFunction
     */
    // Router function for streamable http transport used by Spring WebFlux to start an
    // HTTP server.
    @Bean
    @ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    @Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
    public RouterFunction<?> webFluxStreamableServerRouterFunction(
            WebFluxStreamableServerTransportProvider webFluxProvider,
            Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap) {
        log.info("init webFluxStreamableServerRouterFunction");

        RouterFunctions.Builder routerFunctionBuilder = RouterFunctions.route();
        routerFunctionBuilder.add((RouterFunction<ServerResponse>) webFluxProvider.getRouterFunction());

        for (WebFluxStreamableServerTransportProvider transportProvider : transportProviderMap.values()) {
            routerFunctionBuilder.add((RouterFunction<ServerResponse>) transportProvider.getRouterFunction());
        }

        return routerFunctionBuilder.build();
    }

No, the above code implement custom RouterFunction<?>. In fact, I need dynamic update it after application has started.
Such as, I can dynamic add a new MCP-Server of newApi, that need a new RouterFunction<?> of /mcp/mcp-server-{newApi}. Now, I try add a new RouterFunction<?> Bean by ConfigurableListableBeanFactory.registerSingleton(), but not effective and success.