toruseo/UXsim

Feature requrest: Estimate and record trip travel times

Closed this issue · 17 comments

It would be a very useful feature for me to be able to estimate and record travel times of trips in UXsim.

  • Estimate the expected travel time based on the current state of the network.
  • Record the final travel time a trip took.

The first one can be used to make decisions on (for agents making a mode choice for example), the second can be usefull as metric / KPI.

They are available :)

Record the final travel time a trip took.

This is already implemented. Vehicle.travel_time records its trip travel time. Or, you can output all vehicles' detailed travel logs by World.analyzer.vehicles_to_pandas() and analyze anything as you want.

Estimate the expected travel time based on the current state of the network.

For this, so-called "instantaneous travel time" may be useful. Link.instant_travel_time(t) returns one for that link on time t. In fact, vehicles in UXsim basically uses this information for their routing.

Vehicle.travel_time records its trip travel time.

I missed that, thanks!

so-called "instantaneous travel time" may be useful.

Do you think it would be useful to have a function that returns that total value for the shortest route from some node A to some node B? For example, if I'm an agent deciding to go by car or by bike from A to B, I might want to check what the current travel time is from A to B by car to base my decision on.

That is not difficult. You can create a graph whose link cost is Link.instant_travel_time(t) and then perform shortest path search.

RouteChoice.route_search_all() is doing very similar thing, so it can be a basis for that. Maybe adding the following function to RouteChoice can work. I havent tested it at all, so please check carefully.

    def route_search_all_on_time(s, t, infty=np.inf):
        """
        Compute the current shortest path based on instantanious travel time on time t.
        Experimental, not tested.

        Parameters
        ----------
        t : float
            Time in seconds.
        infty : float
            value representing infinity.
            
        Returns
        -------
        np.array
            The distance matrix. dist[i,j] is the shortest distance from node i to node j.
        """
        adj_mat_time = np.zeros([len(s.W.NODES), len(s.W.NODES)])
        for link in s.W.LINKS:
            i = link.start_node.id
            j = link.end_node.id
            if s.W.ADJ_MAT[i,j]:
                adj_mat_time[i,j] = link.instant_travel_time(t) + link.route_choice_penalty #TODO: link.route_choice_penalty may be time-dependent
                if link.capacity_in == 0: #流入禁止の場合は通行不可
                    adj_mat_time[i,j] = np.inf
            else:
                adj_mat_time[i,j] = np.inf

        dist, pred = floyd_warshall(adj_mat_time, return_predecessors=True)

        return dist 

I would love to have this built-in in UXsim. I will take another look at this when I'm back from vacation next week, but if you have ideas in the meantime, please share!

I have updated Utilities.enumerate_k_shortest_routes and added Utilities.enumerate_k_shortest_routes_on_t. They can be used for this purpose. Currently they are 1-to-1 shortest path search, but it must be easy to extend them to many-to-many.

For the use cases, see

def test_k_shortest_path():
W = World(
name="", # Scenario name
deltan=5, # Simulation aggregation unit delta n
tmax=1200, # Total simulation time (s)
print_mode=1, save_mode=1, show_mode=1, # Various options
random_seed=0 # Set the random seed
)
# Define the scenario
# 2 - D
# | \ |
# O - 1
#free flow travel time:
# O1D = 2000/20 = 100
# O2D = 4000/20 = 200
# O12D= 1600/20 = 80
W.addNode(name="O", x=0, y=0)
W.addNode(name="1", x=1, y=0)
W.addNode(name="2", x=0, y=1)
W.addNode(name="D", x=1, y=1)
W.addLink("O1", "O", "1", length=1000, free_flow_speed=20, number_of_lanes=1)
W.addLink("1D", "1", "D", length=1000, free_flow_speed=20, number_of_lanes=1)
W.addLink("O2", "O", "2", length=3500, free_flow_speed=20, number_of_lanes=1)
W.addLink("2D", "2", "D", length=500, free_flow_speed=20, number_of_lanes=1)
W.addLink("12", "1", "2", length=100, free_flow_speed=20, number_of_lanes=1)
W.adddemand(orig="O", dest="D", t_start=0, t_end=1000, flow=0.6)
# Run the simulation to the end
W.exec_simulation()
# Print summary of simulation result
W.analyzer.print_simple_stats()
df = W.analyzer.link_to_pandas()
assert df[df["link"]=="O1"]["traffic_volume"].values[0] == 600
assert df[df["link"]=="O2"]["traffic_volume"].values[0] == 0
assert df[df["link"]=="12"]["traffic_volume"].values[0] == 600
assert enumerate_k_shortest_routes(W, "O", "D") == [['O1', '12', '2D']]
assert enumerate_k_shortest_routes(W, "O", "D", k=3) == [['O1', '12', '2D'], ['O1', '1D'], ['O2', '2D']]
assert enumerate_k_shortest_routes(W, "O", "D", k=3, return_cost=True) == ([['O1', '12', '2D'], ['O1', '1D'], ['O2', '2D']], [80.0, 100.0, 200.0])
@pytest.mark.flaky(reruns=10)
def test_k_shortest_path_on_t():
W = World(
name="", # Scenario name
deltan=5, # Simulation aggregation unit delta n
tmax=1200, # Total simulation time (s)
print_mode=1, save_mode=1, show_mode=1, # Various options
random_seed=None # Set the random seed
)
# Define the scenario
# 2 - D
# | \ |
# O - 1
#free flow travel time:
# O1D = 2000/20 = 100
# O2D = 4000/20 = 200
# O12D= 1600/20 = 80
W.addNode(name="O", x=0, y=0)
W.addNode(name="1", x=1, y=0)
W.addNode(name="2", x=0, y=1)
W.addNode(name="D", x=1, y=1)
W.addLink("O1", "O", "1", length=1000, free_flow_speed=20, number_of_lanes=1)
W.addLink("1D", "1", "D", length=1000, free_flow_speed=20, number_of_lanes=1)
W.addLink("O2", "O", "2", length=3500, free_flow_speed=20, number_of_lanes=1)
W.addLink("2D", "2", "D", length=500, free_flow_speed=20, number_of_lanes=1)
W.addLink("12", "1", "2", length=100, free_flow_speed=20, number_of_lanes=1, capacity_out=0.4)
W.adddemand(orig="O", dest="D", t_start=0, t_end=1000, flow=0.6)
# Run the simulation to the end
W.exec_simulation()
# Print summary of simulation result
W.analyzer.print_simple_stats()
df = W.analyzer.link_to_pandas()
assert equal_tolerance(df[df["link"]=="O1"]["traffic_volume"].values[0], 460, rel_tol=0.2)
assert equal_tolerance(df[df["link"]=="O2"]["traffic_volume"].values[0], 140, rel_tol=0.2)
assert equal_tolerance(df[df["link"]=="12"]["traffic_volume"].values[0], 325, rel_tol=0.2)
t = 0
assert enumerate_k_shortest_routes_on_t(W, "O", "D", t=t) == [['O1', '12', '2D']]
assert enumerate_k_shortest_routes_on_t(W, "O", "D", t=t, k=3) == [['O1', '12', '2D'], ['O1', '1D'], ['O2', '2D']]
routes, costs = enumerate_k_shortest_routes_on_t(W, "O", "D", t=t, k=3, return_cost=True)
assert routes[0] == ['O1', '12', '2D']
assert equal_tolerance(costs[0], 80.0)
t = 200
assert enumerate_k_shortest_routes_on_t(W, "O", "D", t=t) == [['O1', '1D']]
assert enumerate_k_shortest_routes_on_t(W, "O", "D", t=t, k=3) == [['O1', '1D'], ['O1', '12', '2D'], ['O2', '2D']]
routes, costs = enumerate_k_shortest_routes_on_t(W, "O", "D", t=t, k=3, return_cost=True)
assert routes[0] == ['O1', '1D']
assert equal_tolerance(costs[0], 131.8181818181818)
t = 400
assert enumerate_k_shortest_routes_on_t(W, "O", "D", t=t) == [['O2', '2D']]
assert enumerate_k_shortest_routes_on_t(W, "O", "D", t=t, k=3) == [['O2', '2D'], ['O1', '1D'], ['O1', '12', '2D']]
routes, costs = enumerate_k_shortest_routes_on_t(W, "O", "D", t=t, k=3, return_cost=True)
assert routes[0] == ['O2', '2D']
assert equal_tolerance(costs[0], 200.0)

@toruseo What's the difference between Utilities.enumerate_k_shortest_routes and Utilities.enumerate_k_shortest_routes_on_t? The first gives the travel time now and the other one on a custom time? Or the one is instantaneous and the other actual?

Edit: The first is free-flow travel time, the other on the travel time at some point? But that point can also be now or in the past?

A bit more documentation on these functions would be useful I think.

This is not an user friendly method currently. Takes a while to configure and needs way too many parameters, which are error prone. Like which time do I need to use? W.T?

  File "C:\Users\Ewout\.virtualenvs\Py312\src\uxsim\uxsim\uxsim.py", line 690, in instant_travel_time
    return s.traveltime_instant[-1]
           ~~~~~~~~~~~~~~~~~~~~^^^^
IndexError: list index out of range

I would suggest adding two functions, directly to the World class:

def free_flow_travel_time(self, origin, destination):
def estimated_travel_time(self, origin, destination, time=self.T):

Where origin and destination are both nodes. This way you can easily retrieve both the free flow travel time and the estimated travel time between two nodes. The later calculates it default for the current time (that's self.T right?), but can also be changed from some other time (I still don't really know how this is supposed to work).

Because you don't need to know the exact route, you can use optimized functions like nx.shortest_path_length.

Adding these two functions reduces the potential error of parsing world objects, or using the wrong time.

enumerate_k_shortest_routes(W, "O", "D") returns shortest path from node "O" to "D" based on the free-flow travel time. enumerate_k_shortest_routes_on_t(W, "O", "D", t=t) returns one based on instantanious travel time on time t. I believe this is the simplest implementation. If advanced users need to use different cost functions, they need to specify cost_function arg, which is a bit tricky, but I think this is an acceptable cost.

Because you don't need to know the exact route, you can use optimized functions like nx.shortest_path_length.

You dont need, but others may need.

Thanks for getting back, and the extensive explanation. I will try to get it working again, but because there are so many variables and code paths is relatively difficult to debug.

I completely understand that there are other use cases and some people indeed may want routes as output. I think in a simulation library where performance matters, especially with a function that needed to be called once per trip, it might be nice to have optimized, specialized functions in some cases.

Two questions:

  • Since a World is needed as input anyway, what’s the reason that it’s a separate function outside the World class?
  • Is World.T the time variable need to use for the current simulation time?

For now I worked around it. For reference: EwoutH/urban-self-driving-effects@dd93ee0

Since a World is needed as input anyway, what’s the reason that it’s a separate function outside the World class?

Because it is not required for the default simulation logic. I want to make the uxsim core module as simple as possible.

As a side note, one of my design philosophy on this point is that we should not consider a route choice problem explicitly. If we enumerate routes, it becomes untractable very quickly due to the exponential number of routes in a network. Route choice models without route enumeration such as recursive logit is a bit advanced and computationally costly, and the IIA property will be problematic (there are several workaround for IIA, but they are more complicated). Thus, I believe the current stochastic DUO is the most balanced solution, and necessary functions are provided by the RouteChoice class.

So, my stance is that advanced route choice or other models should preferably be defined outside of the simulation by the user. I believe there are sufficient interfaces for that. Of course, I highly welcome if such models are added to the Utilities submodule. I plan to add some day-to-day dynamical models.

I think in a simulation library where performance matters, especially with a function that needed to be called once per trip, it might be nice to have optimized, specialized functions in some cases.

All-to-all shortest path search with time-dependent data would be useful. I think they can be coded by modifying the current ones.

Is World.T the time variable need to use for the current simulation time?

W.T is a timestep number. W.TIME is time in second, which is equal to W.T*W.DELTAT

UXsim/uxsim/uxsim.py

Lines 1877 to 1878 in 2e61fd5

W.T = 0 #timestep
W.TIME = 0 #s

Thanks for getting back, it's appreciated!

I want to make the uxsim core module as simple as possible.

I understand that, and I think that's a good approach.

Could we make a second module that can be imported/inherited into World specifically for travel time lookups and calculations? Like a kind of extension? That might be very useful for certain research.

we should not consider a route choice problem explicitly

I fully agree! However, to facilitate mode choice properly (which I think is something UXsim could be very useful for?), it would be nice to have a fast way to get expected travel time for a single route or for the whole network.

All-to-all shortest path search with time-dependent data would be useful.

I think I implemented that successfully here. Feel free to incorporate it somewhere in UXsim!

W.T is a timestep number. W.TIME is time in second, which is equal to W.T*W.DELTAT.

Thanks, this helps. And every time some time value is requested, it's generally W.TIME, right?

You can get on-going simulation variables by code something like this:

#dict for time-dependent travel time
tdtt = {}
# Execute simulation 
# Run the simulation for a specific duration (for cases where intervention is desired midway) 
while W.check_simulation_ongoing(): 
    # Execute the simulation in increments of 10 seconds 
    W.exec_simulation(duration_t=10) 
    tdtt[W.TIME] = W.ROUTECHOICE.dist

Note that this may not be executable as I havent debug. For a fully working example, see https://github.com/toruseo/UXsim/blob/main/demos_and_examples/example_08en_signal_reactive_control.py

Unfortunately I found a small bug in W.check_simulation_ongoing() and W.exec_simulation(duration_t=10) today, but I expect it is not critical. I will post Issue later.

Thanks, this helps. And every time some time value is requested, it's generally W.TIME, right?

Generally yes.

FYI I will be very busy in next few weeks and dont have time to code.

No worries, thanks for all your time and effort so far!

New academic year starting?

I believe this is completed. If not, plz re-open