Carlos Cueto
06/08/2022, 4:15 PMFailed to load and execute Flow's environment: ValueError('No flows found in file.')
This is the flow definition:
if __name__ == '__main__':
with Flow('Scouter-Solr-Script') as flow:
snowflake_user = PrefectSecret('snowflake_usr')
snowflake_pwd = PrefectSecret('snowflake_pwd')
snowflake_to_solr(snowflake_user, snowflake_pwd)
flow.run_config = LocalRun(labels=['SVRNAME1'])
flow.storage = Git(repo="Prefect-Flows", flow_path="Python/Scouter/snowflake_to_solr.py", git_clone_url_secret_name="azure_devops_clone_url")
flow.register(project_name='Scouter')
I'm assuming it has to do with the if __name__ == '__main__'
part on top of the Flow class definition, but I don't know how to go about fixing this. I need that for multiprocessing that happens within the main task of the flow.Michał Augoff
06/08/2022, 4:54 PMRobin Doornekamp
06/08/2022, 4:56 PMPaco Ibañez
06/08/2022, 5:37 PMYD
06/08/2022, 5:42 PMaaron
06/08/2022, 6:55 PMBen Ayers-Glassey
06/08/2022, 7:14 PMTim Enders
06/08/2022, 7:41 PMDerek Heyman
06/08/2022, 8:10 PMUnexpected error: KeyError(0)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 681, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
FuETL
06/08/2022, 8:52 PMAn error occurred (ThrottlingException) when calling the RegisterTaskDefinition operation (reached max retries: 2): Rate exceededThis means that i should increase my Rate limit on my AWS account?
Volker L
06/08/2022, 9:03 PMRobin
06/08/2022, 9:04 PMTim Enders
06/08/2022, 9:06 PMTypeError: cannot unpack non-iterable PrefectFuture object
Do I just need to do @task(nout=2)
in Prefect 2.0?Tim Enders
06/08/2022, 9:17 PMJenia Varavva
06/08/2022, 11:08 PMworkspace
the 2.0 cloud analogue to the 1.0 tenant?Jose Daniel Posada Montoya
06/08/2022, 11:08 PMDaniel Lomartra
06/08/2022, 11:31 PMKhang Lam
06/08/2022, 11:35 PMSomething went wrong. Please wait a few moments and try again
. Flow runs fine locally. Any idea how to troubleshoot this?Carlos Cueto
06/09/2022, 3:56 AMif __name__ == ‘__main__’
. It all works fine when running locally with flow.run()
but it tells me it can't find a Flow when I deploy it to Cloud and run it from there.Thomas Fredriksen
06/09/2022, 6:30 AMDaskTaskRunner
in Orion?Joshua Greenhalgh
06/09/2022, 9:10 AMraise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['secret_value'], 'message': 'An unknown error occurred.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Samuel Hinton
06/09/2022, 10:05 AMMarcin Grzybowski
06/09/2022, 11:32 AMExecuting task: docker build --pull --rm -f "Dockerfile" -t myprefect:latest "." <
Sending build context to Docker daemon 101.5MB
Step 1/38 : ARG PYTHON_VERSION=3.8
Step 2/38 : ARG BUILD_PYTHON_VERSION=3.8
Step 3/38 : ARG NODE_VERSION=14
Step 4/38 : FROM node:${NODE_VERSION}-bullseye-slim as ui-builder
14-bullseye-slim: Pulling from library/node
Digest: sha256:f7137af1e34927cb3251a6e091edff4592c49a89422196680a2f087edcbc6e4d
Status: Image is up to date for node:14-bullseye-slim
---> e2eb23c871f2
Step 5/38 : WORKDIR /opt/orion-ui
---> Using cache
---> 74138fe30a86
Step 6/38 : RUN apt-get update && apt-get install --no-install-recommends -y chromium && apt-get clean && rm -rf /var/lib/apt/lists/*
---> Using cache
---> 77731060b4e3
Step 7/38 : RUN npm install -g npm@8
---> Using cache
---> ac23c1b14bea
Step 8/38 : COPY ./orion-ui/package*.json .
When using COPY with more than one source file, the destination must be a directory and end with a /
The terminal process "/usr/bin/bash '-c', 'docker build --pull --rm -f "Dockerfile" -t myprefect:latest "."'" terminated with exit code: 1.
Apostolos Papafragkakis
06/09/2022, 12:58 PMFlorian Guily
06/09/2022, 1:09 PMAnna Geller
06/09/2022, 1:10 PMFlorian Guily
06/09/2022, 1:49 PMfrom asyncio.log import logger
import prefect, pymongo, datetime
from prefect import task, Flow, flatten
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.mysql.mysql import MySQLFetch
from prefect.storage import GitHub
@task(nout= 3)
def produce_output(input):
return input*2-1, input*2, ["val"+str(input*2-1), "val"+str(input*2)]
@task
def reduce(a):
return sum(a)
@task
def retry_post(data):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(data)
with Flow("abcdef") as flow:
success, fail, to_retry = produce_output.map([1, 2, 3, 4])
total_success = reduce(success)
total_fail = reduce(fail)
retry_post.map(flatten(to_retry))
flow.run()
Samarth
06/09/2022, 2:35 PMLocal
storage and Prefect Cloud
I want to use stored_as_script=True
for my flow (pickle storage is not working with an SQLAlchemy object for some reason), and so I need to specify path
— but that is giving me an error. Sharing the relevant code snippet and error.
with Flow(
"eco_flow",
schedule=schedule,
storage=Local(stored_as_script=True, path='~/.prefect/flows/test-flow.py')
) as flow:
cities = ['mumbai', 'new-delhi']
for city in cities:
data = get_traffic_data(city)
clean_data = clean_traffic_data(data)
insert_to_db(clean_data)
Error after registering the flow: Failed to load and execute flow run: ModuleNotFoundError("No module named '~/'")
I guess I am defining the path
incorrectly?Tim Enders
06/09/2022, 2:39 PMfrom prefect.task_runner import DaskTaskRunner
ModuleNotFoundError: No module named 'prefect.task_runner'
Is there an extra that needs installed?Tim Enders
06/09/2022, 2:51 PM@flow(name="Subscriptions Flow",
task_runner=DaskTaskRunner())
def main():
*snip*
pages_list = get_pages_list(client, "subscriptions", params)
for page in pages_list:
item_list = get_items_list(client, "subscriptions", page)
if __name__ == "__main__":
flow_result = main()
print(flow_result)
But I get a RuntimeError from the multiprocessing. Can I get some help on what I am doing wrong with the DaskRunner. I want to parallel run over what is in pages_list
Tim Enders
06/09/2022, 2:51 PM@flow(name="Subscriptions Flow",
task_runner=DaskTaskRunner())
def main():
*snip*
pages_list = get_pages_list(client, "subscriptions", params)
for page in pages_list:
item_list = get_items_list(client, "subscriptions", page)
if __name__ == "__main__":
flow_result = main()
print(flow_result)
But I get a RuntimeError from the multiprocessing. Can I get some help on what I am doing wrong with the DaskRunner. I want to parallel run over what is in pages_list
Kevin Kho
06/09/2022, 2:53 PMTim Enders
06/09/2022, 2:54 PMMichael Adkins
06/09/2022, 2:56 PMpages_list.result()
Tim Enders
06/09/2022, 2:57 PM