insitro/redun

Hashing a Task to include its dependencies

Opened this issue · 2 comments

Let's say I have the following code:

def log_transform(s: pd.Series):
    return np.log(s)


@task()
def transform_features(df: pd.DataFrame) -> pd.DataFrame:
    for c in df.columns:
        df[c] = log_transform(df[c])

    return df

Where log_transform is a function which

  • is invoked by multiple tasks
  • is generally useful, e.g. for interactive work within a jupyter notebook
  • perhaps even lives in a separate code, somewhere under src/utils.py

Now suppose I realize that this function is not very smart, and I want to update it to

def log_transform(s: pd.Series):
    return np.log(s + 1)

Which potentially changes multiple tasks which depend on this code.

My question is the following: would it be a good idea to hash all the internal source code invoked by a task? Would it be possible? If it's not a good idea / it's impossible, what best practices do people follow to avoid such situations?

I can imagine it must be a common problem, unless all the code one writes is wrapped with task decorators?

Edit:
of course one would probably want to check only the user-defined functions e.g.

@task(search_dependencies_under="../src/")
def transform_features(...):
    ...

Thanks for a great question.

My question is the following: would it be a good idea to hash all the internal source code invoked by a task? Would it be possible? If it's not a good idea / it's impossible, what best practices do people follow to avoid such situations?

So far, we've erred on the side of caution about trying to infer all the code reachable from a task. Currently, redun only hashes the source code of the task itself. You can also use manual versioning (@task(version="2")) and bump the version whenever you know you are changing a function reachable from the task (it works, but likely not very convenient).

We have thought about how to let users opt-in other plain python functions into the task hashing, similar to the idea you give.

One design is to do something like this:

def log_transform(s: pd.Series):
    return np.log(s + 1)

@task(hash_includes=[log_transform])
def transform_features(df: pd.DataFrame) -> pd.DataFrame:
    for c in df.columns:
        df[c] = log_transform(df[c])

    return df

hash_includes could include other objects that should influence the hash as well (classes, global variables, docker image hashes, file hashes, functions from third-parties, etc).

I like your idea about using static analysis to find reachable functions, but constraining it to user code via search_dependencies_under. My one concern about relying on static analysis is that is can be hard to predict what gets included (think of functions as values, methods on classes and their super classes, etc), but maybe its not so bad.

I'll update this ticket when we have hash_includes implemented. We're testing it out internally currently.

Are there any updates here? This would be very useful.

We're running into a similar situation where we update sub-tasks and want the scheduler to descend the DAG, get to the changed nodes and pick up execution from there.