psimakis
07/31/2020, 11:28 AMChris Martin
07/31/2020, 4:10 PMKyle Pierce
07/31/2020, 5:09 PMJacob (he/him)
07/31/2020, 5:09 PM@task
def get_file_name(table):
file_name = f'bpi/{table}/-000'
return file_name
with Flow("write to s3") as flow:
exports = ['email_acquisitions', 'new_list_acquisitions', 'revenue', 'segmentation']
for export in exports:
file_name = get_file_name(export)
print(file_name)
I was trying to write a function that does some string manipulation to get a s3 path (this is simplified ^) and when I add the @task decorator, file_name
= <Task: get_file_name>, when I just want it to return the string produced. Am i doing something in a non-prefect way?Mitchell Bregman
07/31/2020, 6:37 PMAshish Arora
07/31/2020, 7:15 PMJustin Mooney
07/31/2020, 8:10 PMAdam Roderick
07/31/2020, 8:33 PMAdam Roderick
07/31/2020, 9:18 PMJohn Ramirez
08/01/2020, 6:26 PMclient
object
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Flow 8eb08aff-f439-422f-94ba-31d30af635cb not found', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
David Stern
08/03/2020, 12:52 AMdbt
, which I think we'll use anyway.Rafal
08/03/2020, 12:45 PMCreateNamespacedDeployment
task I am not sure, if it could start server based on docker image. Any suggestions?Matthew Maldonado
08/03/2020, 1:01 PMMatthew Maldonado
08/03/2020, 1:01 PMMatthew Maldonado
08/03/2020, 1:02 PMMatthew Maldonado
08/03/2020, 1:07 PMTrever Mock
08/03/2020, 5:48 PMShawn Horton
08/03/2020, 5:51 PMRafal
08/03/2020, 7:17 PMFábio Alves
08/03/2020, 11:10 PMHannah Amundson
08/03/2020, 11:45 PMAlfie
08/04/2020, 4:10 AMMatthias
08/04/2020, 7:02 AMAmaljith
08/04/2020, 7:42 AMAmit
08/04/2020, 1:17 PM@task
def task_process_data(date_to_process):
if date_to_process is None:
# calculate
date_to_process = "some_value"
def flow_process_daily_data(*args, **kwargs):
with Flow(*args, **kwargs) as flow:
date_to_process = Parameter('date_to_process', default=None, kind=Parameter.POSITIONAL_OR_KEYWORD)
task_process_daily_data(date_to_process)
return flow
flow_process_daily_data(
name="process-data",
schedule=Schedule(clocks=[CronClock("0 14 */1 * *")]),
storage=storage,
)
# Then I build the storage and register the flow here ..
This flow has no parameters ; Click "Run" to launch your flow!
Also, I don't understand why all the examples of Parameters have flow.run()
called, as I don't call it, since it runs on schedule.Ethan Shenker
08/04/2020, 2:17 PMsignals.FAIL()
state, which would then trigger another task. However, within that downstream task, I would also like to allow the user to interact with the flow to set a flag that would determine the outcome of that downstream flow, and this interaction would likely occur while the downstream task was in a paused state. However, I'm struggling with determining how I'd be able to achieve this interaction, as the paused state pauses all elements of the flow, thus preventing the user interaction I require.
Additionally, if there are any other ways that a task can be set to occur both as a result of a failed previous task and manually at the same time, that insight would be greatly appreciated too. Thanks!Fábio Alves
08/04/2020, 4:07 PMRiley Hun
08/04/2020, 5:57 PMMarwan Sarieddine
08/04/2020, 10:30 PMfiles
dictionary and env_vars
- the docker image is built successfully locally and passes the healthchecks, but when the k8s agent tries to create the prefect job I get ModuleNotFoundErrors
Ming Fang
08/05/2020, 1:44 AMAugust 4th 2020 at 8:56:04pm | prefect.CloudTaskRunner
Unexpected error: NameError("name 'prefect' is not defined")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
value = timeout_handler(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
return fn(*args, **kwargs)
File "minio.py", line 7, in hello_task
NameError: name 'prefect' is not defined