Adam Kelleher
07/27/2020, 7:16 PMWing Rider
07/27/2020, 9:06 PMAntony Southworth
07/28/2020, 1:31 AMAntony Southworth
07/28/2020, 2:15 AMPythonOperator
), but generally you don't want them too small or else things get sluggish due to scheduler overhead.
2. How does Prefect handle data for mapped tasks? For example, if I have a flow t = lambda x: x +1
, r1 = t.map(range(10))
, r2 = t.map(r1)
, my understanding is that Prefect would distribute the computation of r1
across the workers, then collect the results into a list on the "central" machine (where flow.run()
was invoked), then serialise each element again and send the elements to the workers in order to distribute the computation of r2
. This seems a bit inefficient (we compute the results on the worker, serialise, send back to central scheduler, deserialise, then serialise, send back to worker, deserialise, and then do more work).
3. How do folks use Task.map
in practise? For example, would it be weird to run Task.map
for each row in a dataframe? I guess this is related to the first question; basically "how small is too small for tasks/`Task.map`?"
4. Is there any way to "re-run" old instances of tasks? E.g if I had a scheduled process running daily, and I need to re-run the one from Wednesday last week, is there a convenient way of doing that in the UI? I guess I'm basically asking "can you do airflow backfill
?".
5. How do people handle returning large results from Tasks? For example, if I have a task in my Flow that returns a 1GB dataframe, I don't really want that to be serialised and sent back to the "central" machine (where flow.run()
was invoked), cause that machine might have other work to do (and I don't want to waste time on ser/de, or loading the entire thing into memory just to serialise it and send it to another worker). For Airflow, I usually store things on S3 and use XCom to tell other tasks where the result was stored. Would the "Prefect Way" of doing this be for my task to upload its results to S3 and return the path as the return variable from the Task?
6. Is there any way to run Flows in a "streaming" or asynchronous fashion? For example, if my task could yield
rows from a dataframe, and the downstream tasks accepted an Iterator
. Again, just thinking in terms of memory-usage, it would be nice to not require the entire dataframe loaded in-memory.
Apologies if these are all covered in the docs; I thought I got through them pretty thoroughly but it's possible I may have missed some parts.Sven Teresniak
07/28/2020, 9:09 AMLocalResult
as a flow's result=mylocalresultinstance
to persist task results. The data is written to a NFS share which is available to DaskWorker and the agent (exact same mointpoint). With default settings (no result
keyword in the flow definition) all worked well. Now I use some templating to better organize (and later cleanup) results.
Question:
1. The UI states that I cannot restart a task because Warning: If this flow run does not have a result handler, restarting is unlikely to succeed
. Is the text now aware of the deprecated result_handler
keyword and maybe checks the wrong setting? Does the UI need the NFS share (the result location) as well? Or any Prefect related service except agent and dask worker?
2. Once my LocalResult
is working. Is it possible to access results from the UI? I know I can load results using the `Result`'s subclasses. But it would be easier for testing and debugging
3. Is there an elegant way to get rid of old results? Deleting old flow versions (and their runs) does not remove results.
4. How do I find which keyword parameters I can use in the format string of a Result
's location
string? Everything from prefect.context
and all keywords a Task.run()
got?
5. Is Prefect using (persisted) results in any way? Let's assume 1. is a bug and fixed. Does a retry for a failed task run reading its input from (maybe persisted) result objects? Something else?
6. The documentation does not say a word about uniqueness. If Prefect is using results (see 5.), then each persisted result must be unique, right? That is, overwriting a result by accident could lead to complete fuckup?Adam
07/28/2020, 1:28 PMDocker(registry_url=...)
I get the following exception: docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', BadStatusLine('\x15\x03\x01\x00\x02\x02\n'))
Any ideas why that is? FWIW, I'm using Google Container Registry and I've already authenticated with it (running docker pull <http://gcr.io/etc/etc/myprivateimage|gcr.io/etc/etc/myprivateimage>
works fine). CircleCI does use a 'remote docker' to run such commands so perhaps this command isn't able to connect to it?mithalee mohapatra
07/28/2020, 2:09 PMkarteekaddanki
07/28/2020, 7:20 PMLocalEnvironment(executor=DaskExecutor())
. Is this a limitation of the dev version of the cloud API? When using LocalDaskExecutor
and running the flow with flow.run()
results in concurrent execution of the tasks. I am using Docker storage for the flows. How can I debug what's going on in this case?Skip Breidbach
07/28/2020, 9:28 PMcleanup()
method to detect if the tasks in the context succeeded or failed? I see the comment A Task is automatically added to call the cleanup method (closing the Dask cluster) after all tasks under the context have completed.
in the documentation, but can't see a way to access that created Task
.
(I have a situation in which it would be convenient to execute only part of my cleanup
task when something in that context fails.)Adam Roderick
07/28/2020, 11:34 PMDaniel
07/28/2020, 11:46 PM# ETL Messages
extract_messages = etl_msg.extract_messages(company_ids)
process_messages = etl_msg.process_messages(extract_messages)
truncate_messages = etl_msg.truncate_messages(upstream_tasks=[process_messages])
load_messages = etl_msg.load_messages(
process_messages, upstream_tasks=[truncate_messages]
)
# ETL Message Entity Mapping
extract_message_entity_mapping = etl_msg.extract_message_entity_mapping(
extract_messages
)
process_message_entity_mapping = etl_msg.process_message_entity_mapping(
extract_message_entity_mapping
)
truncate_message_entity_mapping = etl_msg.truncate_message_entity_mapping(
upstream_tasks=[process_message_entity_mapping]
)
load_message_entity_mapping = etl_msg.load_messages(
process_message_entity_mapping,
upstream_tasks=[truncate_message_entity_mapping],
)
delphi
07/29/2020, 9:35 AMAdam
07/29/2020, 2:00 PMAdam
07/29/2020, 3:12 PMScott Zelenka
07/29/2020, 3:31 PMagent
container is failing liveness probes, and killing the Pod.
Warning Unhealthy 43s (x2 over 83s) kubelet, .internal Liveness probe failed: Get <http://10.0.28.4:8080/api/health> : dial tcp 10.0.28.4:8080: connect: connection refused
I'm chatting with AWS right now, and they're asserting that because kubelet is attempting to connect on port 8080, that the Deployment YAML for agent
should be exposing that. But I don't have this problem when deploying the same YAML on GCP, OpenShift, or bare metal K8.David Elliott
07/29/2020, 5:27 PMprefect run cloud xxxx
from the CLI) - it’s giving me There was a problem running your flow
. I’ve tried logging out/in as it suggests, but still the same issue.
The CLI returns
prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]
I had this issue with a big flow (770 tasks) this morning and assumed it was a size problem, so I switched to a smaller flow of 368 tasks which ran a couple of times ok, but I’ve just tried to run that same flow (368) again and it’s giving me the same error. Any thoughts / suggestions? Many thanks in advance!Matthew Maldonado
07/29/2020, 5:28 PMMichael Ludwig
07/29/2020, 6:39 PMNone
):
end_state = flow.run(executor=executor,)
for result in end_state.result:
<http://logger.info|logger.info>(f"Result: {result} -> {result.result}")
Flow run SUCCESS: all reference tasks succeeded
Result: <Task: user-to-cluster-snowflake-loader-free> -> None
Result: <Task: asp_ssm_parameter_writer> -> None
Result: <Task: mood_scorer> -> None
Result: <Task: rfy_predictor_FREE> -> None
Result: <Task: rfy_bucket_prefix_updater_Variants.main> -> None
Any ideas? Not using the newer Result
construct though but would be great to grab the return values at the end of the flow runAmit
07/29/2020, 9:06 PMDoran
07/29/2020, 9:43 PMdelphi
07/30/2020, 7:43 AMAdam
07/30/2020, 8:49 AMAdam
07/30/2020, 1:25 PMworker_spec_file
that includes env vars with configMapKeyRef
?Chris Martin
07/30/2020, 3:24 PMAmit
07/30/2020, 4:26 PMMike Nerone
07/30/2020, 4:39 PMMac Gréco Péralte Chéry
07/30/2020, 4:43 PM[2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0]': Starting task run...
I would like to have
[2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0] [hospital name]': Starting task run...
I think this would be great when you visualize the flow run in the UI to know by the task name for which hospital the web scrapping Failed. Whithout going the the logs
P.S Currently i am logging the Hospital name to know What hospital data is currently web-scraped
@task(name="Scraping Institution Data")
def scrape_institution_data(instActionIdTuple):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Scraping for site: {instActionIdTuple[0]}")
So at runtime i get:
[2020-07-30 16:24:11] INFO - prefect.TaskRunner | Task 'Scraping Institution Data[0]': Starting task run...
[2020-07-30 16:24:11] INFO - prefect.Scraping Institution Data[0] | Scraping for site: HĂ´spital Claire Heureuse de Marchand Dessalines
Amit
07/30/2020, 7:15 PMTraceback (most recent call last):
File "/opt/conda/envs/my_project/bin/prefect", line 10, in <module>
sys.exit(cli())
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/prefect/cli/execute.py", line 50, in cloud_flow
client = Client()
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/prefect/client/client.py", line 82, in __init__
self._access_token_expires_at = pendulum.now()
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/__init__.py", line 211, in now
dt = _datetime.datetime.now(local_timezone())
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/__init__.py", line 60, in local_timezone
return get_local_timezone()
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 35, in get_local_timezone
tz = _get_system_timezone()
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 63, in _get_system_timezone
return _get_unix_timezone()
File "/opt/conda/envs/my_project/lib/python3.7/site-packages/pendulum/tz/local_timezone.py", line 242, in _get_unix_timezone
raise RuntimeError("Unable to find any timezone configuration")
RuntimeError: Unable to find any timezone configuration
Skip Breidbach
07/30/2020, 10:34 PMsecrets
to the flow storage. But when I do, I get a surprising (to me) error when the flow is executed:
Failed to load and execute Flow's environment: HTTPError('400 Client Error: Bad Request for url: http://host.docker.internal:4200/graphql')It looks to me like the container thinks it's using the cloud backend or something? Any ideas?
Michael Ludwig
07/31/2020, 7:07 AMMichael Ludwig
07/31/2020, 7:07 AM