Pedro Machado
10/30/2020, 2:36 AMale
10/30/2020, 8:41 AMSque
10/30/2020, 11:04 AMyield
other tasks as dependents.
I was looking for similar functionality and the closest I found is the LOOP
but the problem is that this permits only one direct dependency while I need to create multiple.
I also tried to represent the problem on the flow level but unfortunately it does not work in my case. The task dependencies depend on the value of parameters so this is not known until a flow is executed.
Any ideas here what is the prefect way to tackle this?Joƫl Luijmes
10/30/2020, 2:55 PMfrom prefect import task
from prefect import Flow
from prefect.tasks.postgres.postgres import PostgresExecute, PostgresFetch
with Flow("PostgreSQL -> BigQuery Sync") as flow:
max_product_id = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
# query='SELECT MAX(id) from products'
query='SELECT 188000'
)
query_products = PostgresFetch(
'manager',
'postgres',
'localhost',
15432,
fetch='all',
# query=f'SELECT * FROM products WHERE id > {maximum_id} LIMIT 10'
)
products = query_products(query=f'SELECT * FROM products WHERE id > {max_product_id} LIMIT 10')
state = flow.run()
print(state.result[max_product_id].result)
print(state.result[query_products].result)
print(state.result[products].result)
Edison A
10/30/2020, 5:34 PMwith Flow("epex_scraping", schedule=schedule) as flow:
"""Main definition of all Data pipeline steps"""
report_names = scrape_for_file_names()
for report_name in report_names:
# extract
report_xml = get_xml_files(report_name)
report_json = get_xml_jsons(report_xml)
# transform
public_trades_collection = generate_public_trades(report_json)
# load
write_to_public_trades_db(public_trades_collection)
flow.register('project_x')
flow.run()
Joseph Haaga
10/30/2020, 6:43 PMnewspaper.Article
objects (from newspaper3k
) that I would like to analyze w/ a Spacy model. Is unmapped
an appropriate way to pass in the spacy
model to the task without initializing it each time?
e.g.
@task
def get_articles() -> List[Article]:
...
return articles
@task
def load_spacy():
return spacy.load("en_core_web_md") # this is a slow operation
@task
def extract_organizations(article: Article, nlp) -> Set:
return nlp(article.text).ents
with Flow("Extract Orgs from News Articles"):
articles = get_articles()
nlp = load_spacy()
extract_organizations.map(articles, nlp=unmapped(nlp))
Chris White
Brian Mesick
10/30/2020, 8:05 PM0.12.6
to 0.13.3
the container no longer has git installed, causing pip installs from git to fail. We use a shared library that gets pip installed from git, so this makes us sad. Iām not seeing a way to inject a build step to install git, or instructions on how to add one, so Iām a bit stuck at the moment. Has anyone else run into this?Joseph Haaga
10/30/2020, 9:11 PMValueError: Your docker image failed to build! Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
Does python_dependencies
take care of the pip install
for me? Or do I have to provide a custom Dockerfile that does the COPY requirements.txt
and RUN pip install -r requirements.txt
?Jasono
10/30/2020, 10:10 PMNewskooler
10/30/2020, 10:44 PMCristian Bob
10/30/2020, 11:51 PMDaskExecutor
.
In one of my tests I am runningĀ locally a flow using the DaskExecutor
. The associated dask-scheduler as well as the dask-worker are started on my local computer. The problem is that the tasks' logged messages are being printed in the dask-worker's terminal instead of the terminal where I started the flow. Is there a way to get them printed in the latter terminal? I looked into the `DaskExecutor`'s documentation for an argument like return_stdout
but I couldn't find anything relevant. Can you help me, please?bral
10/31/2020, 4:25 AMJoƫl Luijmes
10/31/2020, 4:30 PMRadek Tomsej
10/31/2020, 7:47 PMAvi A
11/01/2020, 2:02 PMMariusz Olszewski
11/01/2020, 3:11 PMMariusz Olszewski
11/01/2020, 3:11 PMMariusz Olszewski
11/01/2020, 3:12 PMPriyan Chandrapala
11/01/2020, 4:48 PMJasono
11/02/2020, 3:17 AMKlemen Strojan
11/02/2020, 10:04 AMKilian
11/02/2020, 12:14 PMprefect.engine.signals.TRIGGERFAIL('Trigger was "all_successful" but some of the upstream tasks failed.')
I would like to get the actual error message of the upstream task, in this case
<Failed: "Unexpected error: NameError("name 'asd' is not defined")">
@task(trigger=prefect.triggers.any_failed)
def any_failed(data):
Ā Ā Ā Ā logger = prefect.context.get('logger')
Ā Ā Ā Ā print(data)
Ā Ā Ā Ā <http://logger.info|logger.info>("Failed! Error message: %s", data)
@task()
def fail():
Ā Ā Ā Ā asd
Ā Ā Ā Ā return None
@task
def process(data):
Ā Ā Ā Ā return True
with Flow('test') as f:
Ā Ā Ā Ā data = fail()
Ā Ā Ā Ā result = process(data)
Ā Ā Ā Ā any_failed(result)
Ā Ā Ā Ā f.set_reference_tasks([result])
state = f.run()
And I would like to get the upstream error message in any_failed.
I was also looking into state_handlers, but I would prefer this solution having a single point to catch any errors.Maura Drabik
11/02/2020, 3:55 PMBeginning health checks...
System Version check: OK
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 121, in <module>
flows = cloudpickle_deserialization_check(flow_file_path)
File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
flows.append(cloudpickle.load(f))
ModuleNotFoundError: No module named 'utils'
Removing intermediate container 1128b2fe7075
The command '/bin/sh -c python /opt/prefect/healthcheck.py '["/opt/prefect/flows/scm-validation-flow.prefect"]' '(3, 6)'' returned a non-zero code: 1
Any guidance on how to resolve? thanks in advance.Joƫl Luijmes
11/02/2020, 4:39 PMJoseph Haaga
11/02/2020, 5:00 PMAvi A
11/02/2020, 7:21 PMBrian Mesick
11/02/2020, 10:05 PMMichelle Wu
11/03/2020, 5:24 AM