Examples for using fipy.steppers.pidStepper

259 Views Asked by At

I'm using FiPy to simulate a moderately large system of coupled PDEs (11 equations) that calculate the time-dependent evolution of a biochemical advection-diffusion-reaction network. Currently I'm using a fixed time step, and it works fine. However, the nature of the model is such that during the initial time steps the variables change a lot, this requires a small value of dt. After a while the model 'burns in' and the variable changes are much more gradual with time. To save on computational resources, the dt could then be increased. Looking at the documentation, the fipy.steppers.pidStepper seems ideally suited to this. The documentation on the stepper is, however, very scant and I could not find any code examples that have implemented this (either in the FiPy examples themselves, or from a general internet search).

The skeleton of the evaluation loop currently looks as follows (I'd be happy to provide more details if needed); v is an array of 11 cell variables, and p[j] is the PDE for v[j]:

# steps, err and u defined elsewhere 
for step in range(steps):
    r = [1e3] * len(v)
    while max(r) > err:
        for j in range(len(v)):
            r[j] = min(r[j], p[j].sweep(v[j], dt=u))

    for j in range(len(v)):
        v[j].updateOld()

Any pointers on how to adapt this for pidStepper would be appreciated. One requirement would be that I need to be able to specify the saving of model outputs at a set of given time points. These are however much further apart than the intial value of dt used. But if the step size is adaptive, it might not 'land' exactly on one of these time points, so how to implement this?

1

There are 1 best solutions below

1
On

PIDStepper is pretty archaic (last substantive change was about fourteen years ago) and, as you've found, is neither documented nor used anywhere publicly. Since we don't test it, either, we should probably cornfield it.

That said, after rummaging around on my hard drive, I've found a few cases where I experimented with it and, amazingly, it still works.

Using a trivial diffusion problem as an example:

import fipy as fp
from fipy.tools import numerix

nx = 50
dx = 1.
L = nx * dx
mesh = fp.Grid1D(nx=nx, dx=dx)
x = mesh.cellCenters[0]

phi = fp.CellVariable(mesh=mesh, name=r"$\phi$", hasOld=True)

phi.value = 0.
phi.setValue(1., where=(x > L/2. - L/10.) & (x < L/2. + L/10.))

viewer = fp.Viewer(vars=phi, datamin=-0.1, datamax=1.1)

D = 1.
eq = fp.TransientTerm() == fp.DiffusionTerm(D * (1 - phi))

total_elapsed = 0.

dt = 100. * dx**2 / (2 * D)
totaltime = 1000.
checkpoints = (fp.numerix.arange(int(totaltime / dt)) + 1) * dt

To solve this with PIDStepper, you need to define a sweepFn() to actually solve your equation(s). The vardata argument is a tuple of tuples, each of which holds a CellVariable to solve for, the equation to apply, and the old-style boundary conditions to use (all of this radically predates coupled equations or constraints). The dt argument is the adapted time step to attempt. *args and **kwargs are any additional agruments you wish to pass at each step. This function must return an error normalized to one; here we return the L1 norm of var - var.old normalized to the L1 norm of var.old.

def L1error(var):
    denom = numerix.L1norm(var.old)
    return numerix.L1norm(var - var.old) / (denom + (denom == 0))

def sweepFn(vardata, dt, *args, **kwargs):
    error = 0.
    for var, eqn, bcs in vardata:
        res = eqn.sweep(var=var, 
                        dt=dt,
                        boundaryConditions=bcs)
        error = max(error, L1error(var))

    return error

You can optionally define a successFn() to perform after a successful adaptive solution step. The vardata argument is as above. The dt argument is the time step that was requested. The dtPrev argument is the time step that was actually taken. The elapsed argument is how much time is elapsed for this time step. *args and **kwargs are any additional agruments you wish to pass at each step and must be the same as for sweepFn(). Here, we'll want access to the viewer, the list of time steps that were taken dts, the list of elapsed times each was taken at elapseds, and the global clock total_elapsed.

def successFn(vardata, dt, dtPrev, elapsed, viewer, dts, elapseds, total_elapsed, *args, **kwargs):
    dts.append(dtPrev)
    elapseds.append(total_elapsed + elapsed)
    viewer.plot()

You could also optionally define a failFn() to perform whenever sweepFn() returns an error greater than one. This would take the same arguments as successFn().

Finally, instantiate a stepper with appropriate vardata and call step() for each desired time step. Pass the desired time step dt, the time step to try first dtTry, the smallest time step to allow dtMin, the last time step attempted dtPrev, the sweepFn() we just defined, the successFn() we just defined, and the optional arguments that our successFn() makes use of.

from fipy.steppers import PIDStepper
stepper = PIDStepper(vardata=((phi, eq, ()),))

pid_dts = []
pid_elapseds = []

dtMin = dt / 1000
dtTry = dtMin
dtPrev = dtTry
for until in checkpoints:
    dtPrev, dtTry = stepper.step(dt=dt, dtTry=dtTry, 
                                 dtMin=dtMin, dtPrev=dtPrev,
                                 sweepFn=sweepFn,
                                 successFn=successFn,
                                 viewer=viewer,
                                 dts=pid_dts,
                                 elapseds=pid_elapseds,
                                 total_elapsed=total_elapsed)
    total_elapsed += dt

You could also do a similar thing by defining a PseudoRKQSStepper:

stepper = PseudoRKQSStepper(vardata=((phi, eq, ()),))

and use the same sweepFn() and successFn().

My current practice

While this works, I found the whole process of defining and passing around helper functions to be kind of opaque and more trouble than it's worth. Until now, I couldn't be bothered to document it or show anybody else how I'd used it.

These days, I do something more like this:

explicit_dts = []
explicit_elapseds = []

dt = dt / 1000
for until in checkpoints:
    while total_elapsed < until:
        phi.updateOld()
        dt_until = (until - total_elapsed)
        dt_save = dt
        if dt_until < dt:
            dt = dt_until
        res = eq.sweep(var=phi, dt=dt)
        
        if L1error(phi) < 1.:
            total_elapsed += dt

            explicit_dts.append(dt)
            explicit_elapseds.append(total_elapsed)

            dt = dt_save
            
            dt *= 1.2
            
            viewer.plot()
        else:
            phi.value = phi.old.value
            dt /= 2.

This involves about the same amount of code in your script (a little less, actually) and none of the additional 100+ lines of code in PIDStepper and Stepper.

None of these have been tuned, and this diffusion problem isn't much of a test as it's unconditionally stable, but once they get going, all exhibit about the same acceleration. enter image description here