Thomas Furmston
10/18/2021, 9:49 AMimport logging
import pendulum
import prefect
from prefect import Flow, Parameter, task
from prefect.storage import Docker
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun
logger = logging.getLogger(__name__)
weekday_schedule = CronSchedule(
'41 10 * * 1-5',
start_date=pendulum.now(tz='Europe/London')
)
@task
def calculate_flow_end_date(end_date: str):
if end_date is not None:
return end_date
return prefect.context.get('scheduled_start_time').to_date_string()
common_flow = StartFlowRun(
flow_name='already_existing_flow1',
project_name='my_project_name',
wait=True,
)
baseline_flow = StartFlowRun(
flow_name='already_existing_flow2',
project_name='my_project_name',
wait=True,
)
with Flow("my_scheduled_flow",
schedule=weekday_schedule,
storage=Docker(
base_image='my-docker-image:latest',
local_image=True,
)) as flow:
num_days_parameter = Parameter('num_days', default=1)
num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
end_date_parameter = Parameter('end_date', default=None)
task_end_date = calculate_flow_end_date(end_date_parameter)
<http://logger.info|logger.info>('Task End Date: %s', task_end_date)
<http://logger.info|logger.info>('Num Days: %s', num_days_parameter)
<http://logger.info|logger.info>('Num Days Backfill: %s', num_back_fill_days_parameter)
common_flow_result = common_flow(parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
})
baseline_flow_result = baseline_flow(
upstream_tasks=[common_flow_result],
parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
}
)
Thomas Furmston
10/18/2021, 9:50 AMThomas Furmston
10/18/2021, 9:50 AMwith Flow(
'common_flow',
storage=Docker(
base_image='my-docker-image:latest',
local_image=True,
)) as flow:
num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)
served_ads_command = ShellTask(
name='my_first_task',
command=construct_etl_command(
app_mode=settings.app_mode,
table_name='task1',
num_days_cli_arg='num_back_fill_days',
num_days=num_back_fill_days_parameter,
end_date=end_date_parameter,
database=settings.database,
),
stream_output=True,
)
Thomas Furmston
10/18/2021, 9:51 AMThomas Furmston
10/18/2021, 9:51 AM[2021-10-18 09:30:15+0000] INFO - prefect.served_advert_task | /tmp/prefect-ufbr5w5b: line 1: Parameter:: No such file or directory [2021-10-18 09:30:15+0000] ERROR - prefect.served_advert_task | Command failed with exit code 1
Thomas Furmston
10/18/2021, 9:52 AMEric Feldman
10/18/2021, 11:23 AMBarbara Abi Khoriati
10/18/2021, 1:14 PMEddie
10/18/2021, 2:21 PMwait_for_flow_run
tasks for fail if the child flow run was a failure? I know that wait_for_flow_run
returns a FlowRunView
so I assume it is possible to define another task that raises an exception if the run view state is_failed()
but I am curious if there is already an interface for this behavior in the built-in tasks.Constantino Schillebeeckx
10/18/2021, 6:08 PMprefect.context.get("scheduled_start_time")
timezone aware? if not, is there an assumed timezone of UTC? If I schedule my flow with something like CronSchedule(cron, start_date=pendulum.datetime(2021, 1, 1, tz=tz))
will the "scheduled_start_time" have the same timezone as the cron schedule?Rowan Gaffney
10/18/2021, 6:21 PMAnatoly Alekseev
10/18/2021, 6:23 PMBen Muller
10/19/2021, 12:44 AMwith Flow(
name="horse_racing_data",
) as flow:
dates = get_dates_task(days_back=days_back, days_ahead=days_ahead, dt_format="%d-%b-%Y")
raw_sectional_data = apply_map(get_puntingform_sectional_dump_task, date=dates)
spell_stats_data = apply_map(
query_db_for_df_task, path_to_sql=unmapped("sql/select_spell_count.sql")
)
enriched_pf_data = apply_map(
calculate_runners_spell_stats_task, pf_sectional_df=raw_sectional_data, spell_data=spell_stats_data
)
I am making multiple separate apply_map
calls and I just wanted to make sure if I can guarantee that when calling calculate_runners_spell_stats_task
I can guarantee the order of the returned maps?
What i mean is that raw_sectional_data
and spell_stats_data
are iterables and as they are provided to the function it is important that they maintain the same order.
Am I all good here?Gabi Pi
10/19/2021, 6:40 AMEric Feldman
10/19/2021, 8:46 AMprefect.core.flow.Flow.serialized_hash
documentation that if the hash of the flow didn’t changed, that it won’t be uploaded to the server when calling register
But I have the same hash all over again and the server keeps having new versions of the flow 🤔Stefano Cascavilla
10/19/2021, 9:24 AMStartFlowRun.run()
this error is shown:
Error during execution of task: ClientError([{'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]', 'locations': [{'line': 2, 'column': 5}], 'path': ['create_task_run_artifact'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]'}}}])
We are migrating from 0.14.22
version, we are using server
as the backend in local
Can anybody help?Lukáš Polák
10/19/2021, 9:35 AMMark Fickett
10/19/2021, 1:39 PMhaf
10/19/2021, 2:56 PMThomas Furmston
10/19/2021, 3:01 PMThomas Furmston
10/19/2021, 3:02 PMKevin
10/19/2021, 8:09 PMAZURE_STORAGE_CONNECTION_STRING
environment variable or save connection string as Prefect secret.')John Jacoby
10/19/2021, 8:33 PMMatt Alhonte
10/19/2021, 9:54 PMrun_config
based on params, and then kicked off from the Outer Flow
2. Spinning up a Dask Cluster from within the Flow and submitting tasks to it (and the Flow itself just runs in a small container) https://docs.prefect.io/core/idioms/resource-manager.html https://docs.prefect.io/api/latest/tasks/resources.html
3. Maybe start experimenting with Adaptive Scaling for Dask Clusters?haf
10/20/2021, 8:06 AMAdam Everington
10/20/2021, 8:35 AMwith Flow('test_flow',executor=LocalExecutor()) as flow:
df = sometask_that_returns_a_df()
count_of_records_in_df = len(df.index)
execute_task_2(count_of_records_in_df)
Obvs compilation fails because the return type of sometask is a functiontaskLaurens
10/20/2021, 10:12 AMRaúl Mansilla
10/20/2021, 11:43 AMGabi Pi
10/20/2021, 12:11 PMKubernetesRun
?
I tried to do the following:
run_config=KubernetesRun(
env={
"AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
"AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY
})
but when I run the flow, I get an error saying:
Error downloading Flow from S3: An error occurred (InvalidAccessKeyId) when calling the GetObject operation: The AWS Access Key Id you provided does not exist in our records.
Any ideas?John Marx
10/20/2021, 12:46 PM