Tom Shaffner
11/22/2021, 4:23 PMwith Flow(flow_name) as flow:
<http://logger.info|logger.info>(f"{flow_name} Task Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=oracle_query_sql_path,prod=use_oracle_prod)
df = set_data_types(df)
upload_to_table(df, destination_table = data_destination_table_name)
if summary_view_name is not None and history_table_name is not None:
<http://logger.info|logger.info>("Initiating history upload process.")
summary_df,summary_data_empty = pull_summary_data_via(sql=f"SELECT * FROM {summary_view_name}")
if summary_data_empty:
delete_today_from_history_if_exists(df=df,history_table=history_table_name)
upload_to_history_table(df=summary_df, destination_table=history_table_name, append=True)
else:
<http://logger.info|logger.info>("Skipping summary view run: summary view name and/or history table name missing.")
To address this I tried to make the dependencies explicit, by adding "upstream_task" flags to two of the above lines as so:
summary_df,summary_data_empty = pull_summary_data_via(_upstream_tasks_=[upload_to_table],_sql_=_f_"SELECT * FROM {summary_view_name}")
delete_today_from_history_if_exists(_upstream_tasks_=[pull_summary_data_via],_df_=df,_history_table_=history_table_name)
This doesn't seem to fix the issue though; when I run the flow, later tasks still seem to initiate before the Oracle pull, which should occur before everything. Anyone see what I'm doing wrong? The documentation would seem to indicate that feeding result data from one task to another would make dependencies work correctly, but that doesn't seem to be happening here.Marko Herkaliuk
11/22/2021, 5:36 PMDavid Yang
11/22/2021, 8:18 PMJason Motley
11/22/2021, 8:19 PMJason Motley
11/22/2021, 8:41 PMextract => transform => delete 14 days in target => append last 22 days w/ no dupes
. Right now I"m doing 2 parallel ETLs, one which ends in replacing the existing data and the other which appends. This seems very slow since I have to perform 2 full extracts.Wieger Opmeer
11/22/2021, 10:47 PMkwmiebach
11/22/2021, 11:22 PMprefect orion start
-
some error messages appear, including sqlite no such table 'flow_run'
- is there some db initialisation procedure that I missed?kwmiebach
11/22/2021, 11:48 PMConstantino Schillebeeckx
11/23/2021, 12:30 AMPREFECT__LOGGING__LEVEL="DEBUG"
within the run_config (ECSRunner) and I've confirmed that this env is set as such in the running container. However, within my running flow, when I check os.environ
its set to INFO
2. I setup a logger in my shared code that I later import like a library: logger = logging.getLogger("dwh.utils.main")
. I've set PREFECT__CLOUD__SEND_FLOW_RUN_LOGS='true'
and PREFECT__LOGGING__EXTRA_LOGGERS="['dwh']"
. when I execute a flow that uses that shared code, I can't see it emit logs in CloudWatch or in Prefect cloud, however when I run the flow locally I do see the logging statement
what am I missing?Chris L.
11/23/2021, 4:10 AMMatt Clapp
11/23/2021, 4:45 AMsupervisord
monitoring their health."
What I'd like and what my question is: Is there documentation on just setting up a Docker image and having that be self-contained container running a flow on AWS, without needing to kick it off from a local computer? I'd be happy to use Prefect Cloud for UI. Is this a valid use case? Is there some reason there's not much documentation for it? (Or do I just not understand something basic?)
thanks so much for any help.Gaylord Cherencey
11/23/2021, 6:48 AMauthenticator
to the connect method (which doesn't seams to be possible at the moment in the task). Is my assumption correct or is there a magic env variable I can use? If not is it a change I can request or implement my self in the repository?dammy arinde
11/23/2021, 2:14 PMJason Motley
11/23/2021, 4:32 PMMaurits de Ruiter
11/23/2021, 4:45 PMif status in ['SUCCEEDED', 'FAILED']:
fails with the following error:
TypeError: 'sequence' not supported between instances of 'str' and 'tuple'
If we log the type of string and array, it returns their types correctly.Côme Arvis
11/23/2021, 5:50 PMB
which depends on a undefined size list of other tasks [A1, A2, A3, ...]
(Prefect therefore creates an implicit List
task under the hood).
The thing is, some tasks in the list [A1, A2, A3, ...]
can be skip at runtime, but I still want B
to be executed.
I currently can’t achieve this, even if skip_on_upstream_skip=False
is specified for B
, since the implicit List
task is skip without being able to do anything (I receive None
, and not a list of optional elements).
Is there a way to do it? Thanks!Wesam Manassra
11/23/2021, 6:29 PMdockerfile
to the prefect.environments.storage.Docker
class, I get an error that looks like this:
shutil.Error "[Errno 63] File name too long: ['<Endless recursive path>']
Marwan Sarieddine
11/23/2021, 6:29 PMKevin
11/23/2021, 8:28 PMDotan Asselmann
11/24/2021, 10:29 AMhaf
11/24/2021, 10:49 AM@task(
nout=2,
max_retries=10,
retry_delay=timedelta(seconds=1),
)
def fetch_model_settings(
dsn_params: DSNParams,
app_id: UUID,
default_model_group_id: UUID,
) -> ModelSettings:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(
f"Fetching model settings for app_id={app_id}, default_model_group_id={default_model_group_id}"
)
raise ValueError("WTF where are the logs")
# before flow:
prefect.config.logging.level = "DEBUG"
In the sample above I never get to see the WTF in the output of running the flow in the console/locally (PREFECT__LOGGING__LEVEL=DEBUG python flows/flow.py
)Manuel Gomes
11/24/2021, 11:49 AMupload_file()
call, because big&binary). Since it's synchronous, I can trust the file will be where desired when task succeeds.
Next task in this flow is to transcode this video. I do so by creating a mediaconvert
boto3
client, and sending a mess of json to its create_job(**args)
method. This returns me... the job.
Now from what I've read... I should be able to use a prefect built-in prefect.tasks.aws.client_waiter.AWSClientWait
to wait for said job to finish (which is fine, at this point the workflow is serial/synchronous). Problem is... even when the job reports success (in the console, even!), it takes a while (minutes?!) for the transcoded movie to be present in the target bucket.
I would then... need to enter another wait task until I could find the file in the bucket's list of objects, possibly through prefect.tasks.aws.s3.S3List? until I could proceed to do further things to this transcoded video?
This conjunction sounds all too common not to have an integrated solution, unless I'm being dense (hah! no news there!) and not spotting an obvious solution. Any guidance?Marko Herkaliuk
11/24/2021, 3:00 PMEmma Rizzi
11/24/2021, 3:32 PMcreate_flow_run
to launch part 2 synchronized with steps 1 and 3.
Considering I can configure some script to deploy the Agent on VM start, is it possible ? I'm concerned about the main flow run starting before the VM's agent exists
If you have suggestions on better ways to implement this with Prefect I'm interested ! So far i only used basic ECS agent 🙂chicago-joe
11/24/2021, 3:41 PMMichael Warnock
11/24/2021, 4:23 PMAndré Petersen
11/24/2021, 4:28 PMRyan Brennan
11/24/2021, 4:38 PMdbt compile
before executing dbt run
?André Petersen
11/24/2021, 4:51 PMSlackbot
11/24/2021, 5:22 PMSlackbot
11/24/2021, 5:22 PMAnna Geller
11/24/2021, 5:24 PMJason Motley
11/24/2021, 5:25 PMAnna Geller
11/24/2021, 5:26 PMwith Flow(...) as flow:
a = first_task()
b = second_task()
c = third_task(c_inputs, upstream_tasks=[a,b])
Method 2:
with Flow(...) as flow:
a = first_task()
b = second_task()
c = third_task()
c.set_upstream(b)
c.set_upstream(a)
Jason Motley
11/24/2021, 5:27 PMextract_1 = task1(stuff)
load_1(connection=conn, data=extract_1
extract_2 = task2(stuff)
transformed = task3(stuff)
delete_stuff() # This works
load_2(connection=stuconnff, data=transformed)
rewrite as:
extract_1 = task1(stuff)
load_1(connection=conn, data=extract_1, upstream_tasks = [extract_1]
extract_2 = task2(stuff, upstream_tasks = [extract_1, load_1])
etc.
Anna Geller
11/24/2021, 5:31 PMfrom prefect import task, Flow
@task
def first_task():
pass
@task
def second_task():
pass
@task
def third_task():
pass
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[b])
flow.visualize()
Jason Motley
11/24/2021, 5:31 PMAnna Geller
11/24/2021, 5:33 PMJason Motley
11/24/2021, 5:33 PM[]
?Anna Geller
11/24/2021, 5:33 PMJason Motley
11/24/2021, 5:34 PMAnna Geller
11/24/2021, 5:34 PMJason Motley
11/24/2021, 5:34 PMAnna Geller
11/24/2021, 5:35 PMfrom prefect import task, Flow
@task
def first_task():
pass
@task
def second_task():
pass
@task
def third_task():
pass
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[b])
flow.visualize()
Jason Motley
11/24/2021, 5:36 PMAnna Geller
11/24/2021, 5:36 PMimport pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect import task, Flow
@task
def first_task():
pass
@task
def second_task():
pass
@task
def third_task():
pass
with Flow("ex") as flow:
a = first_task()
b = second_task(upstream_tasks=[a])
c = third_task(upstream_tasks=[a,b])
flow.visualize()
Jason Motley
11/24/2021, 5:38 PMload(connection=connection_w, if_exists='append', ## Load new table
db_table='DataTable',
dataframe=new_transformed,
upstream_tasks=[previous_task])
r("load() missing 4 required positional arguments: 'connection', 'if_exists', 'db_table', and 'dataframe'")
Anna Geller
11/24/2021, 6:55 PMwith Flow()
constructor you can only have tasks - are you sure that the load function is decorated with @task? It’s described here: https://docs.prefect.io/core/concepts/tasks.htmlJason Motley
11/24/2021, 6:57 PMFailed to load and execute Flow's environment: NoCredentialsError('Unable to locate credentials')
Anna Geller
11/24/2021, 6:59 PMJason Motley
11/24/2021, 6:59 PMAnna Geller
11/24/2021, 7:00 PMimport pandas as pd
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
@task
def load(df, connection_string, db_table="table", schema="schema"):
engine = create_engine(connection_string)
df.to_sql(db_table, schema=schema, con=engine, index=False)
@task
def get_df():
return pd.DataFrame()
with Flow("ex") as flow:
db_conn = PrefectSecret("DB_CONNECTION_STRING")
df = get_df()
load(df, db_conn)