Diego Alonso Roque Montoya
although the prefect server/agent were running on a dedicated scheduler box. Am I missing something for configuring the workers to work properly? Do they need to point back at the scheduler's ip for logging? Do I need to run an agent on each machine to pass jobs to the workers?
, but not having any caching is anoying. Is there a way to work around this to enable caching for locally run tasks?
or somesuch. But results seem to be centred around handling the writing/pickling of data for you? Ideally I’d not like to care if it’s a S3 prefix (for production) or a local directory (for debugging).
@task() def save_results(dataframe): dataframe.save_parquet(UNIQUE_TASK_LOCATION) return UNIQUE_TASK_LOCATION
prefect backend server
If I don't want "default" displayed on the UI (i.e. I just need one tenant, but don't want it named "default"), I need to add a new tenant. This is done through 3.
prefect server start
This now creates a new, additional, tenant. I only need one... Ideally, the steps would be, from above, 1 then 3 then 2. However, that order doesn't work because the server (database) has to be up for cmd 3 to work. Would it be useful to have the default tenant name as optional argument to
prefect server create-tenant --name "Some Other Name"
? Or am I missing something w.r.t. the creation of my tenant?
prefect server start
from prefect import Task import pandas as pd import os,sys from prefect.utilities.tasks import task import db as db from prefect import task, Flow, Parameter import prefect from prefect.run_configs import LocalRun from prefect.executors import LocalDaskExecutor class ETL(Task): def __init__(self): self.df = self.extract() def extract(self): read_conn = db.read_conn query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id from hq.oproducts p JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id WHERE b.oproduct_id = 5801""" df = pd.read_sql(query,read_conn) return df def load(self): self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False) with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow: df = ETL() df.load() flow.register(project_name="tester")