Madison Schott
08/17/2021, 5:08 PMname=""
suggested in the documentation isn't working for the format I have my tasks in
campaign_details_dbt_run = dbt_task(name="Campaign Details", command='dbt run -m campaign_details')
Italo Barros
08/17/2021, 6:48 PMYD
08/17/2021, 6:52 PMsource_code
folder, that have local imports, and that we need to create a python package in order to use local code.
is this correct or is there some way around it ?Blake Hughes
08/17/2021, 7:24 PMBlake Hughes
08/17/2021, 7:24 PMTimothy Byrne
08/17/2021, 8:26 PMSnowflakeQuery
task into a Pandas dataframe?Mark McDonald
08/17/2021, 9:30 PMYD
08/17/2021, 9:46 PMHui Zheng
08/17/2021, 10:04 PM"PREFECT__TASKS__DEFAULTS__MAX_RETRIES": 2,
"PREFECT__TASKS__DEFAULTS__RETRY_DELAY": 5,
Now, I want to set an individual task_A to retry=0
(not to retry at all) and I find it impossible to do that.
I will explain the details in the thread.Carter Kwon
08/17/2021, 10:07 PM[INACTIVE]
. It's particularly strange because the inactive task definition shows "none" for the task role, but there is a role attached. I'll add a screenshot in the thread. The flow finishes successfully and writes to an s3 bucket that requires the role permissions. Any ideas what could be happening?mithalee mohapatra
08/17/2021, 11:30 PMAdam Shamlian
08/18/2021, 12:04 AMflow_id
for get_task_run_result
when just calling flow.run()
for testing, or should I be thinking about this in an entirely different way when testing?Alexander Seifert
08/18/2021, 8:43 AMAdam Everington
08/18/2021, 9:07 AMKamil Gorszczyk
08/18/2021, 12:21 PMPierre Monico
08/18/2021, 1:31 PMprefect run
CLI --module
option?
• Doing python -m flows.myflow
works
• Doing prefect run -m flows.myflow
raises No module named 'flows'
Kim Pevey
08/18/2021, 3:21 PMFailed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
I’m not sure how to debug this. What are the possible causes of this error?Paolo
08/18/2021, 4:06 PM@task("A small task"))
def small_task(some_params):
# stuff happens here
return something
@task("A complex task with many steps")
def complex_task(lots_of_params):
# stuff happens
# more stuff happens
small_task_result = small_task(some_params)
# even more stuff happens which uses small_task_result
My question is: what happens if I nest tasks as above? I suspect prefect would not like it, but I haven't set up a test infrastructure yet.
Of course, I could use small_task as a simple function (remove the @task decorator), or break up complex_task in smaller tasks, but both solutions would mean for me to either leave things as they are (awful) or rewrite a lot of code.Adam Lancaster
08/18/2021, 4:37 PMkiran
08/18/2021, 4:38 PMCharles Leung
08/18/2021, 4:44 PM# Registration Configs
flow.run_config = NomadRun(**prefect.config.run_config)
flow.storage = GitLab(path='flow.py', **prefect.config.storage)
Wilson Bilkovich
08/18/2021, 5:15 PMFlorian Kühnlenz
08/18/2021, 6:24 PMYD
08/18/2021, 7:12 PMAdam Shamlian
08/18/2021, 7:35 PM@task(result=LocalResult(...))
def a_do_something():
return MyClass(1, 2, 3)
with Flow('A') as child_A:
a_result = a_do_something() #
with Flow('B') as child_B:
b_param = Parameter('a_result_value')
# option 3 below: b_param.serializer = MyClassJSONSerializer() or similar
b_result = b_do_something(b_param)
with Flow('parent') as parent:
a_id = create_flow_run(name=child_A.name)
a_result = get_task_run_result(a_id, 'a_do_something')
create_flow_run(name=child_B.name, parameters={'a_result_value': a_result})
# also wait for a_id, set A upstream of B, wait for b_id
Because Parameters are/return? PrefectResults, they must be JSON serializable (indeed, if I just try the above I get a Type Error: MyClass is not JSON serializable
or similar). Is the expectation that I should be messing with:
1. the serialization of LocalResult from a_do_something
2. somehow injecting a serializer into Parameter (my attempts here have thus far failed - is this even possible?), or
3. modifying the b_param
result (see the commented line in child_B
flow)
Note: based on my reading of several docs pages regardings Results and Serializers, option 2 strikes me as the most intuitive place to look. That is, it seems Prefect wants you to solve this there.Eddie Atkinson
08/19/2021, 2:32 AMfrom prefect.run_configs import ECSRun
from prefect import task, Flow
from prefect.storage import S3
import prefect
TASK_ARN = "an arn"
STORAGE = S3(bucket="my-cool-bucket")
LABELS = ["a label"]
RUN_CONFIG = ECSRun(
labels=LABELS,
task_role_arn=TASK_ARN,
image="prefecthq/prefect:latest-python3.8",
memory=512,
cpu=256,
)
@task(log_stdout=True)
def bye_world():
print("bye world")
@task(log_stdout=True)
def hello_world():
print("Hello, World!")
with Flow("test_flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
hello_world()
bye_world()
flow.register(project_name="Tesla")
Michael Eggert
08/19/2021, 4:12 AMAiden Price
08/19/2021, 6:14 AMresource_manager
class, the flow times out and errors like this appear;
Error during execution of task: AttributeError("'NoneType' object has no attribute 'setup'")
Matias Godoy
08/19/2021, 10:57 AMquery{
flow_run{
name,
created,
state,
flow_id,
duration: <here goes the flow_run execution duration>
}
}
Ideally I could do something like duration = end_time - start_time
, but I think GraphQL does not allow operations like these?
I also checked the flow_run_aggregate
query, but it does not contain such information.
The real problem is that I want to consume this data from Grafana, so I have nowhere to run these calculations and the data should come already calculated in the response.
Any ideas on how could I do this?
Thanks a lot!Italo Barros
08/19/2021, 11:43 AMrequests.exceptions.SSLError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:852)'),))