Anders Segerberg
04/07/2022, 5:12 PMHoratiu Bota
04/07/2022, 5:46 PMcache_validator=prefect.engine.cache_validator.never_use
on the task but that doesn't seem to do the trick (running prefect core locally)Kevin Schaper
04/07/2022, 6:46 PM@task(name="{val}")
syntax, but it didn’t look like the templating syntax was getting interpreted. (I would up with “{val}” in all of my task names)John Ramey
04/07/2022, 6:51 PMscikit-learn
object from a task? Any gotchas if I then pass it to a second task which is responsible for calling .predict()
using that same object. Reason I’m asking: I thought I read somewhere in the prefect docs to be careful with state in such cases, but I can’t seem to find it.Wei Mei
04/07/2022, 8:20 PMprefect register --project $PREFECT_PROJECT_NAME --path flows/ --label prod --no-schedule
Naga Sravika Bodapati
04/08/2022, 6:57 AMMoss Ebeling
04/08/2022, 7:23 AMfrom mypackage.utils import frequently_used_task
where my flow is also inside of mypackage
. Running outside of a docker container with mypackage
installed means I hit an import error immediately.kevin
04/08/2022, 7:36 AMset_task_result()
it fails with error message: 'State payload is too large.'
which makes sense.
My Prefect infrastructure is sitting on a KubernetesJobEnvironment. My understanding is that to store this large result correctly I should follow the documentation here: https://docs.prefect.io/core/concepts/results.html#result-objects
Are there any additional considerations I should take into account with setting up correct result storage with this infrastructure?Alex Rogozhnikov
04/08/2022, 8:09 AMOlivér Atanaszov
04/08/2022, 12:23 PMStephen Lloyd
04/08/2022, 12:43 PM@task
def get_table() -> object:
... query table
result: pandas.DataFrame = cursor.fetch_dataframe()
return result
def load_to_s3(result: object) -> None:
awswrangler.s3.to_csv(
df=result,
path='<s3://bucket/folder/table.csv>
)
with Flow(...) as flow:
get_data = get_table()
save_data = load_to_s3(get_data)
...
I’d like to now extend this to somehow pass a list of tables from the same database to extract and load to s3.
Is this possible given this simple template?Daniel
04/08/2022, 2:14 PMDbtShellTask
.
Here's some mock code:
from prefect.tasks.dbt.dbt import DbtShellTask
dbt_shell_task = DbtShellTask()
with Flow("nightly_dbt_flow") as flow:
env = Parameter('env', default='dev')
run_task = dbt_shell_task(command='dbt run',
environment='dev')
flow.run()
I receive a TypeError that the keyword argument environment
doesn't exist:
Traceback (most recent call last):
File "nightly_dbt_flow.py", line 50, in <module>
environment='dev')
File "/Users/daniel/git-repos/bi-prefect/.python/lib/python3.7/site-packages/prefect/core/task.py", line 662, in __call__
*args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
File "/Users/daniel/git-repos/bi-prefect/.python/lib/python3.7/site-packages/prefect/core/task.py", line 702, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/Users/daniel/.pyenv/versions/3.7.10/lib/python3.7/inspect.py", line 3015, in bind
return args[0]._bind(args[1:], kwargs)
File "/Users/daniel/.pyenv/versions/3.7.10/lib/python3.7/inspect.py", line 3006, in _bind
arg=next(iter(kwargs))))
TypeError: got an unexpected keyword argument 'environment'
Why would environment
cause an error but not command
? When I remove environment
the task does get attempted without a TypeError
. environment
and command
are both arguments defined in the DbtShellTask
class.
I'm on prefect version 1.2.0
.Kevin Weiler
04/08/2022, 4:08 PMAdi Gandra
04/08/2022, 5:18 PMGustavo Puma
04/08/2022, 5:50 PMwith Flow("application-etl") as flow:
conn = PrefectSecret("DATABRICKS_CONNECTION_STRING_PRE")
bronze = DatabricksRunNow(job_id=BRONZE_APPLICATION_JOB_ID, name="bronze")
silver = DatabricksRunNow(job_id=SILVER_APPLICATION_JOB_ID, name="silver")
silver.set_upstream(bronze)
gold = DatabricksRunNow(job_id=GOLD_APPLICATION_JOB_ID, name="gold")
gold.set_upstream(silver)
bronze(databricks_conn_secret=conn)
silver(databricks_conn_secret=conn)
gold(databricks_conn_secret=conn)
flow.run()
I want to run my tasks in the sequence bronze > silver > gold
. While this executes gold or silver are being started before bronze. I don't know if I'm misunderstanding how set_upstream
works 🤔 Thanks in advancePatrick Tan
04/08/2022, 6:48 PMwiretrack
04/08/2022, 9:53 PMflow.run()
within a Celery task for example. Anyone?Ben Epstein
04/09/2022, 12:52 PMfrom prefect import flow
class MyClass:
def __init__(self, ct: int=0):
self.count = ct
@flow
def runit(self):
print(f"I have been called {self.count} times")
self.count += 1
c = MyClass()
c.runit()
I get the error
TypeError: missing a required argument: 'self'
Is that expected, or a potential bug in orion?Ben Epstein
04/09/2022, 12:53 PMc.runit(c)
but that seems pretty wrongCarlos Soza
04/09/2022, 2:57 PMKen Nguyen
04/09/2022, 4:56 PMMichael Law
04/11/2022, 8:23 AMChris Keeley
04/11/2022, 1:09 PMwait_for_flow_run
method for creating flow of flows, but this seems designed to work with one tenantClaire Herdeman
04/11/2022, 1:39 PMAn error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
Is a reliable way to avoid this or a best practice for launching multiple versions of a flow?Roger Webb
04/11/2022, 1:43 PMJelle Vegter
04/11/2022, 2:14 PMNico Neumann
04/11/2022, 2:39 PMChris Martinez
04/11/2022, 2:59 PM<http://prefect.io/flow_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx|prefect.io/flow_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxx>
<http://prefect.io/flow_run_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxx|prefect.io/flow_run_id=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxx> <http://prefect.io/identifier=xxxxxxxx|prefect.io/identifier=xxxxxxxx>
Stephen Lloyd
04/11/2022, 3:39 PM>>> from prefect import Client
>>> client = Client()
>>> client.set_secret(name="test", value="test")
>>> client.set_secret(name="AWS_ACCOUNT_ID-dev", value="123456789098")
It seems like I should be able to use
from prefect.client import Secret
aws_account_id = Secret('AWS_ACCOUNT_ID-' + RUN_ENV).get()
However, I receive the following error:
ValueError: Local Secret "AWS_ACCOUNT_ID-dev" was not found.
How can I retrieve a secret value outside of a task?Atsushi Saito
04/11/2022, 4:14 PMLocalRun
??
Can I use other virtual environments’ python path like virtual envs in pyenv or conda path?