Mihai H
12/09/2019, 3:17 PMMihai H
12/09/2019, 3:18 PMMihai H
12/09/2019, 3:19 PMMihai H
12/09/2019, 3:19 PMMihai H
12/09/2019, 3:20 PMMihai H
12/09/2019, 3:20 PMMihai H
12/09/2019, 3:20 PMMihai H
12/09/2019, 3:21 PMLuke Orland
12/09/2019, 5:04 PM.map
?Walter Gillett
12/09/2019, 10:49 PMCJ Wright
12/09/2019, 10:52 PMArsenii
12/10/2019, 11:58 AMSKIP
halfway if some condition is met
3. Map a function to the results of step 2
I'm coming from an airflow background and was expecting a none_skipped
trigger to apply to step 3, with the expected behavior that it would run if and only if step 2 succeeded. But as far as I understood, all triggers in Prefect treat skipping as success, which is not what I want.
So my question is, is this designed to defer people from not using Prefect in a non-Prefect way? Should I instead use branching logic? Or is it a known missing feature to be implemented later?
This is easily workaround-able with some manual checking logic in step 3 but still.
Thanks!DiffyBron
12/10/2019, 12:25 PM#!/usr/bin/env python
import random
from prefect.triggers import all_successful, all_failed
from prefect import task, Flow
@task(name="Task A")
def task_a():
rand_num = float(random.random())
curr_limits = float(0.5)
if rand_num < curr_limits:
raise ValueError(f'{rand_num} is less than {curr_limits}')
return rand_num
@task(name="Task B", trigger=all_successful)
def task_b():
pass
@task(name="Task C", trigger=all_failed)
def task_c():
pass
if __name__ == '__main__':
with Flow('My triggers') as flow:
success = task_b(upstream_tasks=[task_a])
fail = task_c(upstream_tasks=[task_a])
flow.set_reference_tasks([success])
flow_state = flow.run()
David Ojeda
12/10/2019, 8:02 PMDiffyBron
12/10/2019, 8:30 PMschedules.Schedule(
# fire every day
clocks=[schedules.clocks.IntervalClock(timedelta(days=1))],
# but only on weekdays
filters=[filters.is_weekday],
# and only at 8.15am or 3pm
or_filters=[
filters.between_times(pendulum.time(hour=8, minute=15), pendulum.time(hour=8, minute=15)),
filters.between_times(pendulum.time(hour=9, minute=30), pendulum.time(hour=9,minute=30)),
filters.between_times(pendulum.time(hour=15, minute=50), pendulum.time(hour=15,minute=50)),
filters.between_times(pendulum.time(hour=16), pendulum.time(hour=16)),
],
not_filters=[
filters.between_dates(12, 25, 12, 25)
]
)
Giang Hoang Le
12/10/2019, 11:03 PMBrett Naul
12/11/2019, 12:50 AMBigQueryLoadGoogleCloudStorage
task (which I wrote but that isn’t really helping 😬) and passing in schema
as a list; by the time schema
is used in run
, somehow the order of the list is being altered, causing the columns to be mis-identified. any idea what might be messing with the schema
input to my task? some kind of argument validation or copy maybe…?
context: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/google/bigquery.py#L346-L409itay livni
12/11/2019, 6:31 AMifelse
and select case
(IMO) 🙂DiffyBron
12/11/2019, 9:30 AM#!/usr/bin/env python
import pendulum
from datetime import timedelta
from prefect import task, Flow, schedules
from prefect.schedules import filters, Schedule
@task
def say_hello():
print("hello world")
if __name__ == '__main__':
curr_schedule = Schedule(
# Fire every min
clocks=[schedules.clocks.IntervalClock(interval=timedelta(minutes=1), start_date=pendulum.datetime(2019, 1, 1, tz='America/New_York'))],
# Only on weekdays
filters=[filters.is_weekday],
# and only at 8.15am, 9.30am, 3.50pm, 4pm
or_filters=[
filters.between_times(pendulum.time(hour=8, minute=15), pendulum.time(hour=8, minute=15)),
filters.between_times(pendulum.time(hour=9, minute=30), pendulum.time(hour=9,minute=30)),
filters.between_times(pendulum.time(hour=15, minute=50), pendulum.time(hour=15,minute=50)),
filters.between_times(pendulum.time(hour=16), pendulum.time(hour=16)),
],
# do not run on Christmas
not_filters=[
filters.between_dates(12, 8, 12, 25)
]
)
with Flow('Sounds alerts', curr_schedule) as flow:
say_hello()
flow.run()
Dean Magee
12/12/2019, 12:22 AMPhilip Billaudelle
12/12/2019, 11:44 AMDeprecationWarning
, which seems to lead to an ERROR
in my prefect Runner:
ERROR - prefect.FlowRunner | Unexpected error: DeprecationWarning("Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 482, in get_flow_run_state
key_states = set(flatten_seq([all_final_states[t] for t in reference_tasks]))
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/collections.py", line 29, in flatten_seq
if isinstance(item, collections.Iterable) and not isinstance(
File "/usr/local/lib/python3.7/collections/__init__.py", line 52, in __getattr__
DeprecationWarning, stacklevel=2)
DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
Wayne
12/12/2019, 2:09 PMAndrew Vaccaro
12/12/2019, 5:28 PMBraun Reyes
12/13/2019, 6:12 AMBraun Reyes
12/13/2019, 6:14 AMflow_id = prefect.context.get("flow_id", "unknown")[:8]
flow_version = int(prefect.context.get("version", "0"))
Braun Reyes
12/13/2019, 3:41 PM2019-12-13 1:40am prefect.CloudFlowRunner INFO Beginning Flow run for 'dbt_run'
2019-12-13 1:40am prefect.CloudFlowRunner INFO Starting flow run.
2019-12-13 1:40am prefect.CloudFlowRunner ERROR Unexpected error: TypeError("start() missing 1 required positional argument: 'self'")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 400, in get_flow_run_state
with executor.start():
File "/usr/local/lib/python3.7/contextlib.py", line 239, in helper
return _GeneratorContextManager(func, args, kwds)
File "/usr/local/lib/python3.7/contextlib.py", line 82, in __init__
self.gen = func(*args, **kwds)
TypeError: start() missing 1 required positional argument: 'self'
This was a fargate task started by the fargatetaskenvironmentStuart Young
12/13/2019, 5:13 PMorcaman
12/15/2019, 11:46 AMorcaman
12/15/2019, 11:48 AMAliza Rayman
12/15/2019, 5:14 PMlz4 error
and my whole flow fails. Anyone have advice?