Kurt Rhee
08/09/2021, 2:05 PMMadison Schott
08/09/2021, 2:55 PMMarcus Hughes
08/09/2021, 11:05 PM~/.prefect/results
without me specifically telling it to. That's fine in the short term, but after letting things run I discovered I had over 800 gigabytes of results. Is there an automated way to delete these after some given time elapses built into Prefect? I could just make another flow that cleans up results that are older than a day or something to keep us from ballooning out of drive space. Just curious what the best practice is here. Also, is it safe for me to just delete entries from this directory or will it corrupt a database somewhere?Ryan Sattler
08/10/2021, 1:32 AMhome = os.environ.get("HOME")
if home != "/Users/ryan.sattler":
s3_url = "<http://localstack:4566>"
else:
s3_url = "<http://localhost:31566>"
flow.storage = S3(bucket="prefect-flows", key="hello-task.py", client_options={
"endpoint_url": s3_url,
"aws_access_key_id": "",
"aws_secret_access_key": ""
})
However this doesn’t seem to work (the localhost url is always being used, so it works for registration but not at runtime), possibly because the value of flow.storage is getting baked-in at registration time? How can I make this dynamic?
Unfortunately given my company’s security setup there is no convenient way to use a real S3 bucket (or real container registry or etc) when testing locally.Сергей Романкевич
08/10/2021, 5:19 AMrsv@srv-etl:~/flow$ /bin/python /home/rsv@comandor.local/flow/Test.py
Traceback (most recent call last):
File "/home/rsv@comandor.local/flow/Test.py", line 13, in <module>
flow.register(project_name='Test')
File "/usr/local/lib/python3.7/dist-packages/prefect/core/flow.py", line 1734, in register
idempotency_key=idempotency_key,
File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 1081, in register
project = self.graphql(query_project).data.project # type: ignore
File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 534, in graphql
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 452, in post
retry_on_api_error=retry_on_api_error,
File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 696, in _request
session=session, method=method, url=url, params=params, headers=headers
File "/usr/local/lib/python3.7/dist-packages/prefect/client/client.py", line 605, in _send_request
response.raise_for_status()
File "/usr/local/lib/python3.7/dist-packages/requests/models.py", line 943, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 407 Client Error: Proxy Authentication Required for url: <http://localhost:4200/graphql>
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-4.19.0-14-amd64-x86_64-with-debian-10.8",
"prefect_backend": "server ",
" prefect_version ":" 0.15.0 ",
" python_version ":" 3.7.3 "
Test.py
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def say_hello(name):
print("Hello, {}!".format(name))
with Flow("My First Flow") as flow:
name = Parameter('name')
say_hello(name)
flow.register(project_name='Test')
Rodrigo Menezes
08/10/2021, 2:15 PMJulio Venegas
08/10/2021, 4:18 PMYD
08/10/2021, 11:21 PMTom Forbes
08/11/2021, 5:09 PMscaling out
section of the tutorial (https://docs.prefect.io/core/tutorial/06-parallel-execution.html#scaling-out), when using a remote DaskExecutor
, logs from tasks will not be mirrored locally and we will have to build a custom logger for this? This limitation isn’t explicitly detailed, which is confusingMadison Schott
08/12/2021, 10:02 PMArun Dass
08/12/2021, 11:02 PMArun Dass
08/12/2021, 11:03 PMRyan Sattler
08/16/2021, 6:07 AMHilary Roberts
08/16/2021, 1:44 PMkevin
08/16/2021, 6:29 PMprefect.context.get('<SECRET NAME>')
or should I use the Secret()
class?Alex Furrier
08/16/2021, 10:08 PMTask 'my_task': Starting task run...
and Task 'my_task' :Finished task run for task with final state: 'Success'
) are not showing up. I'm trying to log info to help debugging within tasks like so
@task
def my_task(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Task input: {x}")
if isinstance(x, str):
logger.error("Not good. Input x is a string")
raise ValueError
return x
but nothing shows up in the logs on the server UI.
Any ideas what I can do so that logs show up?Luke Kentwell
08/17/2021, 10:26 AMPayam Vaezi
08/17/2021, 12:24 PM│ [2021-08-17 12:20:57+0000] INFO - prefect.S3 | Flow successfully downloaded. ETag: "ae24fd937c7bfe0317bf996718070f86", LastModified: 2021-08-17T12:13:58+00:00, VersionId: JLZcoHjb99CPDG │
│ [2021-08-17 12:20:58+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for '44cf3c22-40d8-427c-bbf2-d40d4ca113e5' │
│ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'source': Starting task run... │
│ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'source': Finished task run for task with final state: 'Running' │
│ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'sink': Starting task run... │
│ [2021-08-17 12:20:58+0000] INFO - prefect.CloudTaskRunner | Task 'sink': Finished task run for task with final state: 'Pending' │
│ [2021-08-17 12:20:58+0000] INFO - prefect.CloudFlowRunner | Flow run RUNNING: terminal tasks are incomplete.
Haven't seen this problem in local runs! Can someone advise me what may be wrong (I'm getting this intermittently)?Wilson Bilkovich
08/17/2021, 3:48 PMprefect server create-tenant --name default
Is there a more straightforward way to do that in future deployments?Wilson Bilkovich
08/17/2021, 9:16 PM--set ui.apolloApiUrl=http://${CHART_NAME}-apollo:4200/graphql
on the prefect-server Helm chart, but when I went to the UI, I still had to configure the API URL. Anybody know what I might be missing?Michael Hadorn
08/18/2021, 2:12 PMMay 06 15:06:18 <http://rnddkrpocwe01.uhbs.ch|rnddkrpocwe01.uhbs.ch> systemd[2918188]: run-docker-runtime\x2drunc-moby-f59082c1b02d87fbf7172c4bef313af171b43336a316ac5ea05f495dc159627d-runc.nCo90B.mount: Succeeded.
May 06 15:06:19 <http://rnddkrpocwe01.uhbs.ch|rnddkrpocwe01.uhbs.ch> systemd[2918188]: run-docker-runtime\x2drunc-moby-d55a5e41bcd0602af5682df47f4e5b944d17238994b03d9c2b34b8c4ec20c4ad-runc.KWtz2X.mount: Succeeded.
May 06 15:06:19 <http://rnddkrpocwe01.uhbs.ch|rnddkrpocwe01.uhbs.ch> systemd[1]: run-docker-runtime\x2drunc-moby-d55a5e41bcd0602af5682df47f4e5b944d17238994b03d9c2b34b8c4ec20c4ad-runc.KWtz2X.mount: Succeeded.
According to https://github.com/docker/for-linux/issues/679:
It's produced by the healtchecks.
Is there a way to solve this?
Best MichaelPierre Monico
08/19/2021, 10:05 AMDocker Agent
deployed?
I am having a hard time understanding how exactly that will work, in particular: since it needs access to a Docker Daemon to run e.g. DockerRun
run configs, is this supposed to be a “normal” VM? Or can we do some Docker in Docker magic?
(NB: I use Docker but am not an expert)Kevin Kho
08/19/2021, 1:02 PMPierre Monico
08/19/2021, 3:15 PMGit
storage class source, am I correct to assume that it’s not possible to pull from a local repository?Serdar Tumgoren
08/19/2021, 10:46 PMAlex Furrier
08/19/2021, 11:14 PMAlex Furrier
08/20/2021, 4:05 AMRESULT_HANDLER = AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
container='prefect')
with Flow(
'Example Flow',
executor=LocalDaskExecutor(
scheduler="threads",
num_workers=8,
namespace='prefect'
),
) as example_flow:
result_locations = query_data_process_serialize(query)
# Logging locations to be sure they are serialized
log_item(result_locations)
# This seems to break after ~1000 result locations
result_locations = deserialize_and_process_more.map(
record=result_locations,
result_handler=unmapped(RESULT_HANDLER),
upstream_tasks=[result_locations)]
)
I've run this a few times and the same thing always happens. I've copied the logged result locations and indexed that list using failed task children to see if I can deserialize using RESULT_HANDLER
and that works fine. E.g. if the flow failed on child 1100 when mapping deserialize_and_process_more
I tried running RESULT_HANDLER.serializer.deserialize(
RESULT_HANDLER.read(result_locations[1100]).value)
and was able to deserialize the data just fine. Not sure where the disconnect is happening where the result locations become None somehow.Pierre Monico
08/20/2021, 2:39 PMprefect server start
. It seems like there is a problem with the Postgres instance. First I get errors from the hasura and graphql containers (can’t connect to Postgres), and then some postgres debugging messages; but still the other services can’t connect after that. Did anyone run into that before?Lawrence Finn
08/20/2021, 3:12 PMCasey Green
08/24/2021, 3:35 PMCasey Green
08/24/2021, 3:35 PMKevin Kho
08/24/2021, 3:52 PMrun_job
task, and then LOOP there. You can just do trigger_job.run()
and wait_to_complete.run()
inside the big task, but they will just be like functions instead of Prefect tasks.Casey Green
08/24/2021, 3:54 PMwith Flow("test") as flow:
flow_name = Parameter("flow_name", required=True)
handle = trigger_job(job_name)
result = wait_to_complete(handle)
max_runs = 3
# maybe this isn't a case... perhaps a terminal_state_handler that inspects the result and sets the state appropriately?
with case(should_re_run(result), True):
# trigger flow failure, but automatically retry.
Kinda looks and feels like a code smell 🤷♂️Kevin Kho
08/24/2021, 4:39 PMStartFlowRun
task or create_flow_run
task, then we have retries, but the concept of a flow retry is not definitive because some users expect everything to run (even successful tasks) and some expect to run from where it left off. I think retries on these two Prefect tasks can be used for both scenarios by supplying a idempotency_key
Casey Green
08/24/2021, 5:00 PM