Jason Motley
04/06/2022, 11:12 PMmain
section of a standard python script?Shiyu Gan
04/07/2022, 2:11 AMAlan Ning
04/07/2022, 2:28 AMMoss Ebeling
04/07/2022, 6:08 AMRicardo Gaspar
04/07/2022, 12:11 PM2.0.b2
.
do you know where the resource_manager
is located now? Before I was importing from prefect import resource_manager
https://docs.prefect.io/core/idioms/resource-manager.html
CC @Anna Geller @Kevin KhoRobert Holmes
04/07/2022, 1:38 PMdef prefect_flow():
with Flow('cloud_reporting_etl') as flow:
for column, value in jobs_df.iterrows():
job_name = value['Job Name']
query = value['Query']
filename = value['Filename']
extracted = extract_data(f"""{query}""")
load_data_to_s3(extracted, filename)
return flow
Bruno Nunes
04/07/2022, 2:39 PMRicardo Gaspar
04/07/2022, 3:45 PM2.0b2
and starting the server (on an EC2 machine) with an IP on a different port. (self-hosted).
I’m able to connect to it via browser (I’ve set all the security groups and routes to allow me to connect to it on a browser).
However it seems that internally Prefect Orion is still interacting with the default host and port. So the UI doesn’t render all the info (see screenshot in the thread)
Is there something extra I need to change?Anders Segerberg
04/07/2022, 5:12 PMHoratiu Bota
04/07/2022, 5:46 PMcache_validator=prefect.engine.cache_validator.never_use
on the task but that doesn't seem to do the trick (running prefect core locally)Kevin Schaper
04/07/2022, 6:46 PM@task(name="{val}")
syntax, but it didn’t look like the templating syntax was getting interpreted. (I would up with “{val}” in all of my task names)John Ramey
04/07/2022, 6:51 PMscikit-learn
object from a task? Any gotchas if I then pass it to a second task which is responsible for calling .predict()
using that same object. Reason I’m asking: I thought I read somewhere in the prefect docs to be careful with state in such cases, but I can’t seem to find it.Wei Mei
04/07/2022, 8:20 PMprefect register --project $PREFECT_PROJECT_NAME --path flows/ --label prod --no-schedule
Naga Sravika Bodapati
04/08/2022, 6:57 AMMoss Ebeling
04/08/2022, 7:23 AMfrom mypackage.utils import frequently_used_task
where my flow is also inside of mypackage
. Running outside of a docker container with mypackage
installed means I hit an import error immediately.kevin
04/08/2022, 7:36 AMset_task_result()
it fails with error message: 'State payload is too large.'
which makes sense.
My Prefect infrastructure is sitting on a KubernetesJobEnvironment. My understanding is that to store this large result correctly I should follow the documentation here: https://docs.prefect.io/core/concepts/results.html#result-objects
Are there any additional considerations I should take into account with setting up correct result storage with this infrastructure?Alex Rogozhnikov
04/08/2022, 8:09 AMOlivér Atanaszov
04/08/2022, 12:23 PMStephen Lloyd
04/08/2022, 12:43 PM@task
def get_table() -> object:
... query table
result: pandas.DataFrame = cursor.fetch_dataframe()
return result
def load_to_s3(result: object) -> None:
awswrangler.s3.to_csv(
df=result,
path='<s3://bucket/folder/table.csv>
)
with Flow(...) as flow:
get_data = get_table()
save_data = load_to_s3(get_data)
...
I’d like to now extend this to somehow pass a list of tables from the same database to extract and load to s3.
Is this possible given this simple template?Daniel
04/08/2022, 2:14 PMDbtShellTask
.
Here's some mock code:
from prefect.tasks.dbt.dbt import DbtShellTask
dbt_shell_task = DbtShellTask()
with Flow("nightly_dbt_flow") as flow:
env = Parameter('env', default='dev')
run_task = dbt_shell_task(command='dbt run',
environment='dev')
flow.run()
I receive a TypeError that the keyword argument environment
doesn't exist:
Traceback (most recent call last):
File "nightly_dbt_flow.py", line 50, in <module>
environment='dev')
File "/Users/daniel/git-repos/bi-prefect/.python/lib/python3.7/site-packages/prefect/core/task.py", line 662, in __call__
*args, mapped=mapped, upstream_tasks=upstream_tasks, flow=flow, **kwargs
File "/Users/daniel/git-repos/bi-prefect/.python/lib/python3.7/site-packages/prefect/core/task.py", line 702, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/Users/daniel/.pyenv/versions/3.7.10/lib/python3.7/inspect.py", line 3015, in bind
return args[0]._bind(args[1:], kwargs)
File "/Users/daniel/.pyenv/versions/3.7.10/lib/python3.7/inspect.py", line 3006, in _bind
arg=next(iter(kwargs))))
TypeError: got an unexpected keyword argument 'environment'
Why would environment
cause an error but not command
? When I remove environment
the task does get attempted without a TypeError
. environment
and command
are both arguments defined in the DbtShellTask
class.
I'm on prefect version 1.2.0
.Kevin Weiler
04/08/2022, 4:08 PMAdi Gandra
04/08/2022, 5:18 PMGustavo Puma
04/08/2022, 5:50 PMwith Flow("application-etl") as flow:
conn = PrefectSecret("DATABRICKS_CONNECTION_STRING_PRE")
bronze = DatabricksRunNow(job_id=BRONZE_APPLICATION_JOB_ID, name="bronze")
silver = DatabricksRunNow(job_id=SILVER_APPLICATION_JOB_ID, name="silver")
silver.set_upstream(bronze)
gold = DatabricksRunNow(job_id=GOLD_APPLICATION_JOB_ID, name="gold")
gold.set_upstream(silver)
bronze(databricks_conn_secret=conn)
silver(databricks_conn_secret=conn)
gold(databricks_conn_secret=conn)
flow.run()
I want to run my tasks in the sequence bronze > silver > gold
. While this executes gold or silver are being started before bronze. I don't know if I'm misunderstanding how set_upstream
works 🤔 Thanks in advancePatrick Tan
04/08/2022, 6:48 PMwiretrack
04/08/2022, 9:53 PMflow.run()
within a Celery task for example. Anyone?Ben Epstein
04/09/2022, 12:52 PMfrom prefect import flow
class MyClass:
def __init__(self, ct: int=0):
self.count = ct
@flow
def runit(self):
print(f"I have been called {self.count} times")
self.count += 1
c = MyClass()
c.runit()
I get the error
TypeError: missing a required argument: 'self'
Is that expected, or a potential bug in orion?Ben Epstein
04/09/2022, 12:53 PMc.runit(c)
but that seems pretty wrongCarlos Soza
04/09/2022, 2:57 PMKen Nguyen
04/09/2022, 4:56 PMMichael Law
04/11/2022, 8:23 AMMichael Law
04/11/2022, 8:23 AMdef run(self, flow: Flow, scheduler = "threads", num_workers = 8):
env = {
"DATABRICKS_CLUSTER_ID": self.cluster_id,
"MOUNT": self.mount,
"CODEMOUNT": self.codeMount,
"ENVIRONMENT": self.environment,
"RELEASE_VERSION": self.release_version,
"APP_IMAGE": self.kubernetes_job_image,
"AZURE_STORAGE_CONNECTION_STRING": self.storage_connection,
"PREFECT__CONTEXT__SECRETS__DATABRICKS_CONNECTION_STRING": f"'{self.databricks_connection}'",
"PREFECT_PROJECT": self.prefect_project,
"DATABRICKS_INSTANCE_POOL_ID": self.instance_pool_id,
"SQL_SERVER_JDBC_CONNECTION_STRING": self.sql_jdbc_connection_string
}
flow.executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)
# 'DEBUG' is not set on the K8 cluster
if (os.environ["DEBUG"] == "1"):
flow.run()
elif (os.environ["DEBUG"] == "2"):
flow.visualize()
else:
flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=env)
flow.storage = Azure(container="fdpflows", connection_string=self.storage_connection)
Anna Geller
04/11/2022, 9:22 AMwith Flow("your_flow", executor = LocalDaskExecutor(scheduler = scheduler, num_workers = num_workers)) as flow:
This may help since the executor is retrieved from storage at runtimeMichael Law
04/11/2022, 10:16 AMAnna Geller
04/11/2022, 10:30 AMexport PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"
flow.run_config = KubernetesRun(image=self.kubernetes_job_image, env=dict(PREFECT__LOGGING__FORMAT="%(levelname)s - %(name)s - %(process)s - %(thread)s | %(message)s"))
Michael Law
04/11/2022, 10:32 AMAnna Geller
04/11/2022, 2:09 PM