Vince
12/20/2021, 9:35 PMBen Muller
12/20/2021, 11:00 PMHead <https://registry-1.docker.io/v2/prefecthq/prefect/manifests/0.15.5-python3.8>: received unexpected HTTP status: 503 Service Unavailable
Yusuf Khan
12/21/2021, 2:21 AMYehor Sikachov
12/21/2021, 11:41 AMLiri Rozenthal
12/21/2021, 2:56 PMJohn-Craig Borman
12/21/2021, 4:03 PMJason Motley
12/21/2021, 4:27 PMAlejandro Sanchez Losa
12/21/2021, 5:27 PMMax Watermolen
12/21/2021, 5:35 PMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AppRegistryNotReady("Apps aren\'t loaded yet.")')
Trevor Campbell
12/21/2021, 5:41 PMa.log
as an input, and produces another file b.log
as an output, and I want the task to run only when the timestamp of a.log
is newer than b.log
(in addition to the usual waiting for predecessor Prefect tasks). I could of course just do this manually inside the task, but was wondering if there was a better way to go about itVipul
12/21/2021, 6:45 PMJason Motley
12/21/2021, 7:12 PMflow.run_config = ECSRun(
env={"EXTRA_PIP_PACKAGES": "requests" "numpy"})
Jason Motley
12/21/2021, 10:12 PMDaniel Komisar
12/21/2021, 10:13 PM_has_key
. I’ve tried using _contains
with no luck, although I’m not sure if that’s the right one either, or if this is even possible. Thanks!Danny Vilela
12/21/2021, 11:10 PMIntervalSchedule
with pendulum
(since that’s what I’ve used for daily/weekly tasks) but he noticed that the results don’t quite line up with what he was expecting:
import pendulum
from prefect.schedules.schedules import IntervalSchedule
from prefect.schedules.clocks import CronClock
# Set our start date.
next_start_date: pendulum.DateTime = (
pendulum.now(tz="America/Los_Angeles")
.start_of(unit="month")
.set(day=2, hour=8, minute=0, second=0)
)
# Set our monthly interval.
monthly: pendulum.Duration = pendulum.duration(months=1)
# Inspect the next few clock emissions.
schedule: IntervalSchedule = IntervalSchedule(start_date=next_start_date, interval=monthly)
print(schedule.next(n=3))
# [
# DateTime(2022, 1, 1, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles')),
# DateTime(2022, 1, 31, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles')),
# DateTime(2022, 3, 2, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles'))
# ]
Why does the IntervalSchedule
not fire on 2022-01-02
, 2022-02-02
, 2022-03-02
, etc? It appears to just be incrementing by 30 days, but that’s not quite what I’d expect. Is this a pendulum
thing?
(Edit: it’s maybe worth noting that in the example above, just doing next_start_date + monthly
does give you the correct DateTime(2022, 1, 2, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles'))
. So I think it may actually be a Prefect thing?)Brian S
12/22/2021, 1:41 AMRyan Sattler
12/22/2021, 5:32 AMrilshok
12/22/2021, 8:27 AMAlfredo Prada Giorgi
12/22/2021, 8:56 AMrilshok
12/22/2021, 8:56 AM@task
def get_paths() -> List[Path]:
...
@task
def dosmth(path: Path) -> Any:
...
@task
def finally(path: Path, smth: Any):
...
with Flow('fucking-prefect') as flow:
paths = get_paths()
smth = dosmth.map(paths) # maybe there are exceptions
# TODO: need to synchronize paths and smth
finally.map(paths, smth)
:dusty_stick:I implemented my filter like prefect.tasks.control_flow.FilterTask
from typing import List, Any, Tuple, Union
from prefect import Task
from prefect.triggers import all_finished
class CrossSkip(Task):
def __init__(self, *skip, **kwargs) -> None:
kwargs.setdefault("skip_on_upstream_skip", False)
kwargs.setdefault("trigger", all_finished)
self._types = tuple([s for s in skip if isinstance(s, type)])
self._values = [s for s in skip if not isinstance(s, type)]
if not skip:
self._types = (type(None), )
super().__init__(**kwargs)
def _filter(self, value) -> bool:
return not isinstance(value, self._types) and not any([value == v for v in self._values])
def run(self, *task_results: List[Any]) -> Union[List[Any], Tuple[List[Any], ...]]:
"""Task run method."""
assert task_results
assert len({*map(len, task_results)}) == 1
if len(task_results) == 1:
return [r for r in task_results[0] if self._filter(r)]
return tuple([*map(list, zip(*[
r for r in zip(*task_results)
if all([self._filter(v) for v in r])
]))])
The flow should have turned into something like this, and everything would have worked fine
with Flow('best-prefect-flow') as flow:
paths = get_paths()
smth = dosmth.map(paths) # maybe there are exceptions
# >>>
paths, smth = CrossSkip(Exception, None)(paths, smth)
# <<<
finally.map(paths, smth)
BUT
ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
In general, I know how to use python magic to solve this problem, but I refuse to conjure further 🙂
TLDR: Prefect's tasks can't unpack arguments:
@task
def todosmth(*arg) -> Any:
...
Martim Lobao
12/22/2021, 10:23 AMPaul Gierz
12/22/2021, 10:59 AMlat_size = Parameter("Latitude Size (e.g 1 for a 1x1 degree grid)", default=1.0)
lon_size = Parameter("Longitude Size (e.g 1 for a 1x1 degree grid)", default=1.0)
lats = np.arange(-90, 90, lat_size)
lons = np.arange(-180, 180, lon_size)
but then:
$ prefect register --project tutorial -p simulation_workflows/workflows
Collecting flows...
osgeo is not installed, conversion to Geo formats like Geotiff (fesom2GeoFormat) will not work.
Error loading 'simulation_workflows/workflows/fesom_2d_variable.py':
Traceback (most recent call last):
File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/site-packages/prefect/cli/build_register.py", line 134, in load_flows_from_script
namespace = runpy.run_path(abs_path, run_name="<flow>")
File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/runpy.py", line 268, in run_path
return _run_module_code(code, init_globals, run_name,
File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/runpy.py", line 97, in _run_module_code
_run_code(code, mod_globals, init_globals,
File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/Users/pgierz/Documents/SciComp/Projects/Workflows/simulation_workflows/simulation_workflows/workflows/fesom_2d_variable.py", line 27, in <module>
lons = np.arange(-180, 180, lon_size.value)
AttributeError: 'Parameter' object has no attribute 'value'
Is the stupid solution just to make a mini task instead of directly using numpy?Tom Klein
12/22/2021, 11:51 AMEduardo Fernández León
12/22/2021, 12:09 PMrunner token
that I couldn't be able to find in the Cloud UI. Thanks in advance.Robert Kowalski
12/22/2021, 2:46 PMTraceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 188, in interrupt_if_cancelling
flow_run_info = self.client.get_flow_run_info(flow_run_id)
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1564, in get_flow_run_info
raise ClientError('Flow run ID not found: "{}"'.format(flow_run_id))
prefect.exceptions.ClientError: Flow run ID not found: "0695cb92-7995-43b1-abf7-6500eb7e9fc0"
Flow freeze on one task, this task insert data to influxdb. I have two instance of this task with two different database config. This two tasks are execute in the same time. One off instance execute correctly, second task never ends. Does anybody have an idea what might be causing this log error or why the task is not ending?Philip MacMenamin
12/22/2021, 3:19 PMDimosthenis Schizas
12/22/2021, 3:31 PMPREFECT__CLOUD__AGENT__AUTH_TOKEN
which is deprecated if I understand correctly. Is it correct to assume that I can remove that arg and keep only the PREFECT__CLOUD__API_KEY
Paul Gierz
12/22/2021, 4:34 PM@task
def get_n_newest_files_for_pattern(pattern: str, path: str, n: int) -> list:
"""
Task to get the n newest files for a given pattern.
"""
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Getting {n} newest files in {path} for pattern {pattern}")
path_files = os.listdir(path)
files_with_path = [os.path.join(path, f) for f in path_files]
files = [pathlib.Path(f) for f in files_with_path if re.search(pattern, f)]
<http://logger.info|logger.info>(f"Found {len(files)} files for pattern {pattern}")
logger.debug(f"Files: {files}")
<http://logger.info|logger.info>("Sorting files by modification time")
files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
<http://logger.info|logger.info>(f"Returning the {n} newest files")
return files[:n]
I am getting file not found errors with:
FileNotFoundError: [Errno 2] No such file or directory: '<Parameter: Path to the top level of the experiment tree>/outdata/fesom'
I thought that once it was loaded, any Parameter
would behave as whatever type it is supposed to be?
It is defined like this:
path = Parameter(name="Path to the top level of the experiment tree")
I one time before do an f-string conversion:
outdata_path = f"{path}/outdata/fesom"
Not having f-strings would be possible, but a bit annoyingPedro Machado
12/22/2021, 7:46 PMLeon Kozlowski
12/22/2021, 7:56 PMLeon Kozlowski
12/22/2021, 7:56 PMMichael Adkins
12/22/2021, 8:02 PMLeon Kozlowski
12/22/2021, 9:34 PMMichael Adkins
12/22/2021, 10:13 PM