Martin T
06/07/2022, 1:48 PMfrom prefect import Flow, Parameter, case, task
@task
def mytask(p2):
print(p2)
with Flow("case-demo") as flow:
p1 = Parameter("p1", default=False)
p2 = Parameter("p2", default="good")
with case(p1, True):
p2 = "bad"
mytask(p2)
print(flow.tasks)
$ prefect build -p flow.py
UserWarning: Tasks were created but not added to the flow: {<Parameter: p1>, <Parameter: p2>}.
$ prefect run -p flow.py
...bad...
Marcin Grzybowski
06/07/2022, 1:51 PM-v /home/marcin/.prefect/orion.db:/root/.prefect/orion.db
when I run orion dashboards from host:
prefect orion start
it works as intended - shows flows run from container.
But when I try to host dashboards from container I get empty dashboards (also mounting oriond.db, so both containers see the same database
-v /home/marcin/.prefect/orion.db:/root/.prefect/orion.db
added -p 4200:4200 to map port fdrom host to container, and exported PREFECT_ORION_API_HOST=0.0.0.0 so I can access dashboards in container from my host.
orion.db is properly mounted in both containers, because I see that it changes in them when i run a flow. Dashboards are accessible, but empty...Florian Guily
06/07/2022, 2:22 PMJohn O'Farrell
06/07/2022, 2:40 PMAn error occurred (ClientException) when calling the RegisterTaskDefinition operation: Container.image should not be null or empty.
even though I know that the image URI that I'm using within containerDefinitions
has content.Dekel R
06/07/2022, 5:57 PM@task()
def task_a():
df = pd.Dataframe()
......
<http://logger.info|logger.info>(df.shape) # 19M and 13 cols
return df
@task()
def task_b(df):
<http://logger.info|logger.info>(type(df)) # Nonetype
with Flow('****',
storage=Docker(registry_url="us-central1-docker.pkg.dev****/",
dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow: #
df = task_a()
task_b(df)
Malthe Karbo
06/07/2022, 6:13 PMJason Motley
06/07/2022, 7:54 PMRon Budnar
06/07/2022, 9:31 PM@task
def get_dbt_kwargs(**kwargs):
return kwargs
with Flow(name="dbt_command_flow", run_config=docker_run, storage=STORAGE) as flow:
dbt_repo = Parameter(dbt_repo_url", ...)
dbt_env = get_dbt_kwargs(
DBT_ENV_SECRET__SQL_SERVER=PrefectSecret("DBT_ENV_SECRET__SQL_SERVER"),
DBT_ENV_SECRET__SQL_DATABASE=PrefectSecret("DBT_ENV_SECRET__SQL_DATABASE"),
DBT_ENV_SECRET__SQL_USER=PrefectSecret("DBT_ENV_SECRET__SQL_USER"),
DBT_ENV_SECRET__SQL_PASSWORD=PrefectSecret("DBT_ENV_SECRET__SQL_PASSWORD"),
)
... # a couple of tasks omitted for brevity
dbt_run = dbt(
env=dbt_env,
command=dbt_command,
task_args={"name": "Run DBT Shell command"},
upstream_tasks=[pull_task, dbt_env],
)
My question is - is there a better/preferred way to do the above? Basically, I'm trying to ensure that if credentials/secrets are changed in Prefect Cloud, then the next run picks them up correctly.Jacob Bedard
06/08/2022, 12:00 AMAndreas Nigg
06/08/2022, 6:59 AM◦ If you are using an agent running on Kubernetes, update the Prefect image version toThanks for the update by the way - up until now, working with prefect is just awesome - I enjoy every minute of it.in your Kubernetes manifest and re-apply the deployment.2.0b6
Tarek
06/08/2022, 7:44 AMprefect server create-tenant --name default --slug default
in the dockerfile of my app, it doesn't work.
why do other users have to install prefect locally outside the containers and run the command there.
no way to have the command somewhere in the docker images?Andreas
06/08/2022, 8:51 AMArthur Jacquemart
06/08/2022, 11:26 AMFrederick Thomas
06/08/2022, 1:28 PMDanilo Drobac
06/08/2022, 1:32 PMprefect cloud login --key <API Key I Just Created>
but I keep getting an error that says:
Unable to authenticate. Please ensure your credentials are correct.
Am I missing something?Nelson Griffiths
06/08/2022, 1:52 PMOllie Sellers
06/08/2022, 2:01 PMAlexandru Anghel
06/08/2022, 2:11 PMflow.register(project_name='my-project')
inside the python code. The while loop from the second task is preventing the register and it is unreachable. I want to register and run the flow using python file.py
More code inside the thread.
Is there a way to reach the register command of the flow inside the python code? Thank you!Ilhom Hayot o'g'li
06/08/2022, 2:20 PMTim Helfensdörfer
06/08/2022, 2:52 PMTim Enders
06/08/2022, 3:15 PMsqlite3.OperationalError: table flow already exists
Carlos Cueto
06/08/2022, 4:15 PMFailed to load and execute Flow's environment: ValueError('No flows found in file.')
This is the flow definition:
if __name__ == '__main__':
with Flow('Scouter-Solr-Script') as flow:
snowflake_user = PrefectSecret('snowflake_usr')
snowflake_pwd = PrefectSecret('snowflake_pwd')
snowflake_to_solr(snowflake_user, snowflake_pwd)
flow.run_config = LocalRun(labels=['SVRNAME1'])
flow.storage = Git(repo="Prefect-Flows", flow_path="Python/Scouter/snowflake_to_solr.py", git_clone_url_secret_name="azure_devops_clone_url")
flow.register(project_name='Scouter')
I'm assuming it has to do with the if __name__ == '__main__'
part on top of the Flow class definition, but I don't know how to go about fixing this. I need that for multiprocessing that happens within the main task of the flow.Michał Augoff
06/08/2022, 4:54 PMRobin Doornekamp
06/08/2022, 4:56 PMPaco Ibañez
06/08/2022, 5:37 PMYD
06/08/2022, 5:42 PMaaron
06/08/2022, 6:55 PMBen Ayers-Glassey
06/08/2022, 7:14 PMTim Enders
06/08/2022, 7:41 PMDerek Heyman
06/08/2022, 8:10 PMUnexpected error: KeyError(0)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 681, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0