GeoscienceAustralia/anuga_core

anuga Flux inflow big problem

Closed this issue · 5 comments

Hy everyone, I want to model with anuga a flux inflow (Q[m^3/s]) in my domain, in order to simulate a flooding event in the Adige river, that is a quite important river with a flooding pick event of 2500 m^3/s. I 've tried to use a Reflective boundary coupled to an inlet operator but the problem is that with the inlet operator the Flux has null velocity and so in my case I have a massive amount of water that comes in without his real velocity (uniform flow). This led to smaller velocity in all the stream course with the velocity that increases itself towards the close section.
Smaller velocity causes higher water depths with the water wrongly going out of the river bed.
Thank you very much for your kindness and attention,
Best regards,
Michele Zucchelli.

@mikilterribile boundary conditions for the shallow water wave equation are mathematically a bit subtle. Depending on the situation you need to specify 3, 2, 1, or 0 conditions. So specifying just inflow discharge might not be enough to sensibly specify enough conditions to get the inflow you want.

So for a given inflow discharge I suggest exactly what you have done, setup a reflective BC and an Inlet_operator with the specified discharge and let the program work out the steady down stream flow. The Inlet_operator does take a velocity argument [u,v] which you might like to play with to improve your simulation.

Another tweak is to entend your mesh a little upstream to represent only the river, and apply reflective boundaries on this extension. Put your Inlet_operator at the top of this river extension, and make the extension long enough for the flow to become steady. You get no overflow of the river bank as the river bank is modelled by the reflective BC.

An another tweak. If the flow turns out to be in a low Froude regime, then you might want to use the low Froude flag.

domain.set_low_froude(1)

This tends to allow faster flow in low froude regions.

@stoiver, thank you for your answer and your useful tips. I have tried to pass the velocity argument at the inlet_operator but this led to a divergent increase of velocity. Infact, the inlet operator adds the velocity at the current velocity value and this makes no sense, in my opinion. In order to overcome this, I have created a slightly modified version of the Inlet_operator, but I'm not sure that this is correct.. Certainly, the divergence problem doesn't exists anymore and the problem is stable, with the correct velocity upon the inlet_operator:

class My_inlet_operator(anuga.Operator):
    """Inlet Operator - add water to an inlet.
    Sets up the geometry of problem

    Inherit from this class (and overwrite
    discharge_routine method for specific subclasses)

    Input: domain, Two points
    """

    def __init__(self,
                 domain,
                 region,
                 Q=0.0,
                 velocity=None,
                 zero_velocity=False,
                 default=0.0,
                 description=None,
                 label=None,
                 logging=False,
                 verbose=False):
        """Inlet Operator - add water to a domain via an inlet.

        :param domain: Specify domain
        :param region: Apply Inlet flow over a region (which can be a Region, Polygon or line)
        :param Q: function(t) or scalar discharge (m^3/s)
        :param velocity: Optional [u,v] to set velocity of applied discharge
        :param zero_velocity: If set to True, velocity of inlet region set to 0
        :param default: If outside time domain of the Q function, use this default discharge
        :param description: Describe the Inlet_operator
        :param label: Give Inlet_operator a label (name)
        :param verbose: Provide verbose output



        Example:

        >>> inflow_region  = anuga.Region(domain, center=[0.0,0.0], radius=1.0)
        >>> inflow = anuga.Inlet_operator(domain, inflow_region, Q = lambda t : 1 + 0.5*math.sin(t/60))

        """

        anuga.Operator.__init__(self, domain, description, label, logging, verbose)

        self.inlet = inlet.Inlet(self.domain, region, verbose=verbose)

        # should set this up to be a function of time and or space)
        self.Q = Q

        if velocity is not None:
            assert len(velocity) == 2

        self.velocity = velocity

        self.zero_velocity = zero_velocity

        self.applied_Q = 0.0

        self.total_applied_volume = 0.0

        self.set_default(default)

        self.activate_logging()

        # print('Setting up inlet operator')

    def __call__(self):

        timestep = self.domain.get_timestep()

        # print('Timestep', timestep)

        t = self.domain.get_time()

        # Need to run global command on all processors
        current_volume = self.inlet.get_total_water_volume()
        total_area = self.inlet.get_area()

        # print(current_volume)
        # print(total_area)

        assert current_volume >= 0.0

        Q1 = self.update_Q(t)
        Q2 = self.update_Q(t + timestep)

        # print Q1,Q2
        Q = 0.5 * (Q1 + Q2)
        volume = Q * timestep

        # print(volume)
        # print volume

        # print Q, volume

        # store last discharge
        self.applied_Q = Q

        # print(self.domain.fractional_step_volume_integral)

        # Distribute positive volume so as to obtain flat surface otherwise
        # just pull water off to have a uniform depth.
        if volume >= 0.0:
            # print('volume>=0.0')
            self.inlet.set_stages_evenly(volume)
            self.domain.fractional_step_volume_integral += volume
            if self.velocity is not None:
                depths = self.inlet.get_depths()
                #self.inlet.set_xmoms(self.inlet.get_xmoms() + depths * self.velocity[0])
                #self.inlet.set_ymoms(self.inlet.get_ymoms() + depths * self.velocity[1])
                self.inlet.set_xmoms(depths * self.velocity[0])
                self.inlet.set_ymoms(depths * self.velocity[1])
            if self.zero_velocity:
                self.inlet.set_xmoms(0.0)
                self.inlet.set_ymoms(0.0)

        elif current_volume + volume >= 0.0:
            depth = (current_volume + volume) / total_area
            self.inlet.set_depths(depth)
            self.domain.fractional_step_volume_integral += volume
            if self.zero_velocity:
                self.inlet.set_xmoms(0.0)
                self.inlet.set_ymoms(0.0)
        else:  # extracting too much water!
            self.inlet.set_depths(0.0)
            volume = -current_volume
            self.applied_Q = -current_volume / timestep
            self.domain.fractional_step_volume_integral -= current_volume
            self.applied_Q = - current_volume / timestep
            if self.zero_velocity:
                self.inlet.set_xmoms(0.0)
                self.inlet.set_ymoms(0.0)

            # msg =  'Requesting too much water to be removed from an inlet! \n'
            # msg += 'current_water_volume = %5.2e Increment volume = %5.2e' % (current_volume, volume)
        self.total_applied_volume += volume..."

@mikilterribile thanks for picking up the bug in Inlet_operator regarding the setting of velocity. I have updated it in the develop branch of anuga_core

I have also created a version of the inlet operator modified to accept a function with three parameters: Q, x-velocity, y-velocity. This version is suitable for modelling a more complex inflow function such as the uniform flow. I hope this will be usefuf.

#Stavolta la funzione ritorna anche i valori delle velocità u e v (Q,u,v)
class Inlet_operator_velocity_in_function(anuga.Operator):
def init(self,
domain,
region,
Q=(0.0, 0.0, 0.0), #una funzione oppure tre valori costanti (una tupla)
default=0.0,
description=None,
label=None,
logging=False,
verbose=False):

    anuga.Operator.__init__(self, domain, description, label, logging, verbose)

    self.inlet = inlet.Inlet(self.domain, region, verbose=verbose)

    # should set this up to be a function of time and or space)
    self.Q = Q

    self.applied_Q = 0.0

    self.total_applied_volume = 0.0

    self.set_default(default)

    self.activate_logging()

    # print('Setting up inlet operator')

def __call__(self):

    timestep = self.domain.get_timestep()
    t = self.domain.get_time()

    # Need to run global command on all processors
    current_volume = self.inlet.get_total_water_volume()
    total_area = self.inlet.get_area()

    assert current_volume >= 0.0

    Q1, u1, v1 = self.update_Q(t)
    Q2, u2, v2 = self.update_Q(t + timestep)

    # print Q1,Q2
    Q = 0.5 * (Q1 + Q2)
    u = 0.5 * (u1 + u2)
    v = 0.5 * (v1 + v2)
    volume = Q * timestep

    # store last discharge
    self.applied_Q = Q

    # Distribute positive volume so as to obtain flat surface otherwise
    # just pull water off to have a uniform depth.
    if volume >= 0.0:
        self.inlet.set_stages_evenly(volume)
        self.domain.fractional_step_volume_integral += volume
        depths = self.inlet.get_depths()
        self.inlet.set_xmoms(depths * u)
        self.inlet.set_ymoms(depths * v)

    elif current_volume + volume >= 0.0:
        depth = (current_volume + volume) / total_area
        self.inlet.set_depths(depth)
        self.domain.fractional_step_volume_integral += volume

    else:  # extracting too much water!
        self.inlet.set_depths(0.0)
        volume = -current_volume
        self.applied_Q = -current_volume / timestep
        self.domain.fractional_step_volume_integral -= current_volume
        self.applied_Q = - current_volume / timestep

        # msg =  'Requesting too much water to be removed from an inlet! \n'
        # msg += 'current_water_volume = %5.2e Increment volume = %5.2e' % (current_volume, volume)
    self.total_applied_volume += volume

def update_Q(self, t):
    """Allowing local modifications of Q
    """
    from anuga.fit_interpolate.interpolate import Modeltime_too_early, Modeltime_too_late

    if callable(self.Q):
        try:
            Q, u, v = self.Q(t)
        except Modeltime_too_early as e:
            Q, u, v = self.get_default(t, err_msg=e)
        except Modeltime_too_late as e:
            Q, u, v = self.get_default(t, err_msg=e)
    else:
        Q, u, v = self.Q

    return Q, u, v

def statistics(self):

    message = '=====================================\n'
    message += 'Inlet Operator: %s\n' % self.label
    message += '=====================================\n'

    message += 'Description\n'
    message += '%s' % self.description
    message += '\n'

    inlet = self.inlet

    message += '-------------------------------------\n'
    message += 'Inlet\n'
    message += '-------------------------------------\n'

    message += 'inlet triangle indices and centres\n'
    message += '%s' % inlet.triangle_indices
    message += '\n'

    message += '%s' % self.domain.get_centroid_coordinates()[inlet.triangle_indices]
    message += '\n'

    message += 'region\n'
    message += '%s' % inlet
    message += '\n'

    message += '=====================================\n'

    return message

def timestepping_statistics(self):

    message = '---------------------------\n'
    message += 'Inlet report for %s:\n' % self.label
    message += '--------------------------\n'
    message += 'Q [m^3/s]: %.2f\n' % self.applied_Q
    message += 'Total volume [m^3]: %.2f\n' % self.total_applied_volume

    return message

def print_timestepping_statisitics(self):

    message = self.timestepping_statistics()
    print(message)

def set_default(self, default=None):

    """ Either leave default as None or change it into a function"""

    if default is not None:
        # If it is a constant, make it a function
        if not callable(default):
            tmp = default
            default = lambda t: tmp

        # Check that default_rate is a function of one argument
        try:
            default(0.0)
        except:
            msg = "could not call default"
            raise Exception(msg)

    self.default = default
    self.default_invoked = False

def get_default(self, t, err_msg=' '):
    """ Call get_default only if exception
    Modeltime_too_late(msg) has been raised
    """

    # Pass control to default rate function
    value = self.default(t)

    if self.default_invoked is False:
        # Issue warning the first time
        msg = ('%s\n'
               'Instead I will use the default rate: %s\n'
               'Note: Further warnings will be suppressed'
               % (str(err_msg), str(self.default(t))))

        warnings.warn(msg)

        # FIXME (Ole): Replace this crude flag with
        # Python's ability to print warnings only once.
        # See http://docs.python.org/lib/warning-filter.html
        self.default_invoked = True

    return value

def set_Q(self, Q):

    self.Q = Q

def get_Q(self):

    return self.Q

def get_inlet(self):

    return self.inlet

def get_applied_Q(self):

    return self.applied_Q

def get_total_applied_volume(self):

    return self.total_applied_volume

Thank you