Marwan Sarieddine
02/06/2022, 3:43 PMSergi de Pablos
02/07/2022, 7:07 AMvinoth paari
02/07/2022, 8:02 AMprefect backend cloud
to log in.rishika kumari
02/07/2022, 8:06 AMCarmen Marcos
02/07/2022, 10:16 AMCarlos Paiva
02/07/2022, 10:52 AMRicardo Gaspar
02/07/2022, 2:04 PMboto3
and awswrangler
APIs (as I’ve seen others users asking for it on the community channel).
My current issue, is on the lack of understanding on how the Flows are rendered on the Schematic view. It’s related to tasks dependencies and inter-task communication (passing values).Bruno Murino
02/07/2022, 2:14 PMFabrice Toussaint
02/07/2022, 2:33 PMValueError("Couldn't infer a flow in the current context")
Ryan Brennan
02/07/2022, 2:39 PMProjects
. Right now we just have one big project for all of our flows. Is there any benefit to breaking them out further? Is it possible to do things like “run all flows in project X” or are projects only for organizational aesthetics?Ken Nguyen
02/07/2022, 5:17 PMHeeje Cho
02/07/2022, 5:20 PM@task
def generator(limit: int):
logger = prefect.context.get("logger")
for i in range(limit):
<http://logger.info|logger.info>(i)
yield i*i
Nick Hart
02/07/2022, 6:17 PMflow_if_success
or flow_if_failure
. Basically, I want to know how I can take the final state signal from conditional-flow
and decide whether I want to run flow_if_success
or flow_if_failure
. I was having trouble matching the task signal with a value so that the case works properly. I was hoping you would be able to help me out. Would I need to use the get_task_run_result
or is there a better way I can just grab the task result? Here is a snippet of my current test code:
var1 = Parameter("var1", default = 4)
var2 = Parameter("var2", default = 12)
conditional_id = StartFlowRun(flow_name="Conditional-Flow", project_name="Test", wait=True)
flow_if_success_id = StartFlowRun(flow_name="Flow_if_success", project_name="Test", wait=True)
flow_if_failure_id = StartFlowRun(flow_name="Flow_if_failure", project_name="Test", wait=True)
# Idea: Conditional flow runs first. If it succeeds with Success signal, run flow_if_success, if it fails with failure signal, run flow_if_failure
with Flow("Conditional FoF") as parent_flow:
conditional_run = conditional_id()
with case (conditional_run, Success): #Conditional_run never matches Success even if successful task run
flow_if_success_run = flow_if_success_id(parameters=dict(num=var1))
with case (conditional_run, Failed):
flow_if_failure_run = flow_if_failure_id(parameters=dict(number=var2))
Andrea Haessly
02/07/2022, 6:23 PMBrian Phillips
02/07/2022, 7:55 PMTim Enders
02/07/2022, 8:40 PMClients have non-trivial state that is local and unpickleable.
The load still seems to run OK, but it is marked as failure. How can I get rid of this error?David Yang
02/07/2022, 9:20 PMVamsi Reddy
02/08/2022, 12:08 AMDaniel Saxton
02/08/2022, 1:19 AMOvo Ojameruaye
02/08/2022, 7:58 AMAmichai Ben Ami
02/08/2022, 12:16 PMdb-upgrade] Error applying Hasura metadata from /prefect-server/services/hasura/migrations/metadata.yaml
I could not find anything that changed from our side.
any idea?
Thanks 🙏Antonio Manuel BR
02/08/2022, 12:58 PMsubflow_res.result[deployed_model].result
). When I wrote the code, I read the docs, knowing this way was not valid for remote executions. I would like to know the proper way to gather results when working in a remote distributed environment.
Can anyone help me?Brett Naul
02/08/2022, 1:19 PMThomas Pedersen
02/08/2022, 2:37 PMBruno Murino
02/08/2022, 2:54 PMArchie Kennedy-Dyson
02/08/2022, 3:09 PMAndrew Lawlor
02/08/2022, 3:24 PMMatthew Seligson
02/08/2022, 5:42 PMMadison Schott
02/08/2022, 8:19 PMre_data notify slack \
--start-date 2021-01-01 \
--end-date 2021-01-31 \
--webhook-url <https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX> \
--subtitle="[Optional] Markdown text to be added as a subtitle in the slack message generated"
Tom Shaffner
02/08/2022, 8:42 PMTom Shaffner
02/08/2022, 8:42 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
if cache_flag:
<http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(server=server) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
data_amount=len(df.index)
<http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
assert data_amount!=0 # Cause task failure if 0 is returned
logger.debug(f"Oracle pull completed; returning...")
return df
with Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
cache_invalid = Parameter('cache_invalid',default=False)
<http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=ORACLE_QUERY_SQL_PATH,server=ORACLE_SERVER,cache_flag = cache_invalid)
logger.debug('Oracle pull returned.')
df = set_data_types(df)
create_data_summary_artifacts(df)
u=upload_to_table(df, destination_table = DATA_DESTINATION_TABLE_NAME,append=False)
add_indices_to_table(upstream_tasks=[u],destination_table=DATA_DESTINATION_TABLE_NAME,column_list=INDEX_COLUMNS)
flow.run_config = LocalRun(env={'PREFECT__LOGGING__FORMAT':'[%(asctime)s-%(levelname)s - %(name)s]-[%(filename)s:%(funcName)s]-Line %(lineno)d: %(message)s',
'PREFECT__LOGGING__LEVEL':'DEBUG',
},
working_dir=file_path, labels=["normal-process"])
flow.executor = LocalDaskExecutor(scheduler='threads',num_workers=8)
flow.register(project_name=PROJECT_NAME,parameters={'cache_invalid':False},idempotency_key=(FLOW_NAME+datetime.now().strftime("%Y%m%d-%H%M%S")))
Kevin Kho
02/08/2022, 8:45 PMTom Shaffner
02/08/2022, 8:45 PMpip install -U prefect
the above is what I get as the most current oneKevin Kho
02/08/2022, 8:48 PMpip install prefect==1.0rc1
. The -U
just gets the latest release (not pre-release)Tom Shaffner
02/08/2022, 8:48 PMKevin Kho
02/08/2022, 8:51 PMTom Shaffner
02/08/2022, 8:52 PMKevin Kho
02/08/2022, 8:58 PMTom Shaffner
02/08/2022, 8:59 PMKevin Kho
02/08/2022, 8:59 PMMichael Adkins
02/08/2022, 9:00 PMTom Shaffner
02/08/2022, 9:01 PMMichael Adkins
02/08/2022, 9:02 PMTom Shaffner
02/08/2022, 9:04 PMMichael Adkins
02/08/2022, 9:05 PMTom Shaffner
02/08/2022, 9:06 PMMichael Adkins
02/08/2022, 9:09 PMTom Shaffner
02/08/2022, 9:09 PMMichael Adkins
02/08/2022, 9:11 PMTom Shaffner
02/08/2022, 9:12 PMMichael Adkins
02/08/2022, 9:12 PMTom Shaffner
02/08/2022, 9:12 PMMichael Adkins
02/08/2022, 9:12 PMTom Shaffner
02/08/2022, 9:14 PMMichael Adkins
02/08/2022, 9:17 PM8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Attaching process based timeout handler...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Sending execution to a new process...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Waiting for process to return with 10800s timeout...
8 February 2022,03:15:18 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Executing...
8 February 2022,03:31:10 prefect INFO Rows returned in oracle data query: 96,562...
8 February 2022,03:31:10 prefect DEBUG Oracle pull completed; returning...
8 February 2022,03:31:10 prefect.CloudTaskRunner DEBUG Task 'Pull Oracle Data': Execution successful.
Tom Shaffner
02/08/2022, 9:18 PMMichael Adkins
02/08/2022, 9:18 PMTom Shaffner
02/08/2022, 9:18 PMMichael Adkins
02/08/2022, 9:18 PMTom Shaffner
02/08/2022, 9:19 PMMichael Adkins
02/08/2022, 9:19 PMTom Shaffner
02/08/2022, 9:20 PMMichael Adkins
02/08/2022, 9:20 PMTom Shaffner
02/08/2022, 9:21 PMMichael Adkins
02/08/2022, 9:24 PMPREFECT__LOGGING__LEVEL=DEBUG prefect run --name "hello-world" --project "example" --execute
Tom Shaffner
02/08/2022, 9:25 PMMichael Adkins
02/08/2022, 9:27 PMTom Shaffner
02/08/2022, 9:27 PMMichael Adkins
02/08/2022, 9:28 PMtype(return_val)
call would have to fail which seems very unlikelyTom Shaffner
02/08/2022, 9:28 PMMichael Adkins
02/08/2022, 9:28 PMTom Shaffner
02/08/2022, 9:28 PMMichael Adkins
02/08/2022, 9:29 PMTom Shaffner
02/08/2022, 9:29 PMMichael Adkins
02/08/2022, 9:29 PMTom Shaffner
02/08/2022, 9:29 PMMichael Adkins
02/08/2022, 9:30 PMTom Shaffner
02/08/2022, 9:30 PMMichael Adkins
02/08/2022, 9:31 PMTom Shaffner
02/08/2022, 9:32 PMMichael Adkins
02/08/2022, 9:33 PMTom Shaffner
02/08/2022, 9:33 PMMichael Adkins
02/08/2022, 9:36 PMTom Shaffner
02/08/2022, 9:42 PMMichael Adkins
02/08/2022, 9:44 PMTom Shaffner
02/08/2022, 9:44 PMMichael Adkins
02/08/2022, 9:45 PMTom Shaffner
02/08/2022, 9:45 PMMichael Adkins
02/08/2022, 9:45 PMTom Shaffner
02/08/2022, 9:46 PMMichael Adkins
02/08/2022, 9:48 PMTom Shaffner
02/08/2022, 9:51 PM@task(name='Pull Oracle Data',cache_for=timedelta(hours=10),timeout=(3*60*60),max_retries=3, retry_delay = timedelta(minutes=randint(5,25)),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_oracle_data_via_with_pickle(oracle_query_sql, server='INTP',cache_flag = False) -> DataFrame:
if cache_flag:
<http://logger.info|logger.info>('Task flagged to invalidate/ignore cache.')
<http://logger.info|logger.info>(f"Initiating Oracle Pull using {oracle_query_sql}...")
with Oracle_DB(server=server) as o_db:
df = o_db.pull_data_from_oracle(oracle_query_sql)
data_amount=len(df.index)
<http://logger.info|logger.info>(f"Rows returned in oracle data query: {data_amount:,}...")
assert data_amount!=0 # Cause task failure if 0 is returned
logger.debug(f"Oracle pull completed; returning...")
pickled_val = cloudpickle.dumps(df)
logger.debug(f"Pickling successful")
return df
Michael Adkins
02/08/2022, 10:37 PMTom Shaffner
02/09/2022, 2:43 PMKevin Kho
02/09/2022, 2:47 PMMichael Adkins
02/09/2022, 3:15 PM<http://df.to|df.to>_json()
from your task instead of the raw data.