Kevin
05/12/2022, 12:17 PMAndres
05/12/2022, 12:43 PMdbtShellTask
. The issue I have is that I have a handler which sends notifications with details on the failing flows but when dbt task fails with return code 1, all i get is 'Command failed with exit code 1'
is there a way to capture in a variable the logs of a failed shell task to then process them and display them using the handler?Sumant Agnihotri
05/12/2022, 1:34 PMwith Flow("flow-a") as flow_a:
a()
b()
flow = Flow("flow-a-b", tasks=[a, b])
flow.register(project_name="tester")
Is the code written in function a
and b
saved on the cloud, or does the cloud refers to my system every time it needs to run the flow on an agent? If it does refer to my system, will it make a difference if the agent is running on a different system?Florian Guily
05/12/2022, 2:47 PMRaviraja Ganta
05/12/2022, 3:07 PMBob Colner
05/12/2022, 3:37 PMSequentialTaskRunner
. Showing the most recent logs first would help usability.Sang Young Noh
05/12/2022, 3:50 PMJason
05/12/2022, 4:06 PMwith case(save_snowflake, True): # type: ignore
load_snowflake(
task_args=dict(name="Load Owners into Snowflake"),
data_payload=owners_df,
)
load_snowflake(
task_args=dict(name="Load Properties into Snowflake"),
data_payload=properties_df,
)
load_snowflake(
task_args=dict(name="Load Amenities into Snowflake"),
data_payload=amenities_df,
)
load_snowflake.map(
data_payload=tuple(reservations_df),
task_args=dict(name="Load Reservations into Snowflake"),
)
Florian Guily
05/12/2022, 4:10 PMMySqlExecute
task. I have no error (i had some but i fixed them) but the row i try to insert isn't showing in the table. I can do it with the same user from mysql workbench and the MySqlExecute
task return 1 as expected. I don't understand why, maybe i'm doing something wrong. Here is a code example of my task:
@task
def set_new_date():
logger = prefect.context.get("logger")
new_date = datetime.date.today().strftime("%Y-%m-%d")
query = f"""
INSERT INTO `name`.`table_name` (`name`, `value`) VALUES ('name', '{new_date}');
"""
result = MySQLExecute(
db_name="name",
user="user",
password="pwd",
host="host",
port=3306,
query=query).run()
<http://logger.info|logger.info>("New date is {}".format(new_date))
<http://logger.info|logger.info>("Query results is {}".format(result))
Michael Law
05/12/2022, 4:22 PMSean Harkins
05/12/2022, 5:42 PMAndrew Decker
05/12/2022, 8:37 PMLocalStorage
and KubernetesRun
, as shown in this example. We're trying to follow the same pattern in that example, and our setup registers flows in CI via the Prefect CLI when a PR is merged into the main branch.
When doing this, the flow is actually run while building in our CI environment (in this case, I just have a print statement in the flow to verify that it's running).
Is there a recommended pattern to register flows in CI without running them?Paco Ibañez
05/12/2022, 8:54 PMJason
05/12/2022, 9:14 PMSumant Agnihotri
05/12/2022, 9:29 PM@task
def slicer():
time.sleep(2)
op = []
for each in range(0, random.randint(1, 10)):
op.append(each)
return op
@task
def model_slide():
logger = prefect.context.get("logger")
time.sleep(2)
<http://logger.info|logger.info>(f'model c: {datetime.now().strftime("%H:%M:%S")}')
if __name__ == "__main__":
with Flow("processing-flow", executor=LocalDaskExecutor()) as flow:
list_of_slices = slicer()
for each in list_of_slices:
create_flow_run(flow_name="other-processing-flow", project_name="poc")
model_slide(upstream_task=[create_flow_run])
I'm getting the following error:
for each in list_of_slices:
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
What am I doing wrong?Haleemur Ali
05/13/2022, 2:50 AMRaviraja Ganta
05/13/2022, 8:42 AMaccess_token
and key created via service accounts. Both of them did not work.Jeff Kehler
05/13/2022, 9:13 AMep
05/13/2022, 10:15 AMAndrew Moist
05/13/2022, 10:19 AMArthur Jacquemart
05/13/2022, 10:52 AMQuan Cao
05/13/2022, 11:04 AMprefect deployment create <deployment_name>
for an existed deployment, I get error prefect.exceptions.ObjectAlreadyExists
and no changes are made.
Please help me on how exactly do I make changes to my deployments.
Thank you.Tom Manterfield
05/13/2022, 11:31 AM2.0b4
, some great stuff in there. One thing that caught my eye was:
Futures from async tasks in sync flows are now marked as synchronousThe docs section for async tasks just says ‘Coming soon’. Is this a feature that’s already available and the docs are out of date or am I misunderstanding what is meant here?
Guilherme Petris
05/13/2022, 11:35 AMAnurag Bajpai
05/13/2022, 12:53 PMNaga Sravika Bodapati
05/13/2022, 2:00 PMJessica Smith
05/13/2022, 2:33 PMJohn Kang
05/13/2022, 2:44 PMJason
05/13/2022, 3:05 PMRETRY
, is it possible to queue an upstream task that had already passed? For example:
drop_glue_database = Parameter()
case(drop_glue_database, True):
drop_stuff()
save_to_parquet()
If save to parquet fails, I'd like the task to drop and rebuild the database, which is usually unnecessary and parameterized.JK
05/13/2022, 3:09 PMprefect.exceptions.ClientError: [{'path': ['secret_value'], 'message': 'No value found for the requested key ("xxx") in tenant xxx', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]