Son Mai
06/26/2021, 3:21 PMBen Muller
06/28/2021, 3:41 AMdeserialize
a .prefect_result
file that is persisted to S3 as a result?Rishabh Poddar
06/28/2021, 7:21 AMShivam Shrey
06/28/2021, 9:55 AMRomain
06/28/2021, 12:30 PMthebuleon29
06/28/2021, 4:39 PMRunNamespacedJob
tasks in my flow. I installed Prefect using pip install "prefect[kubernetes]"
, but when I try to run my flow i get `ImportError: Using prefect.tasks.kubernetes
requires Prefect to be installed with the "kubernetes" extra.` . I also tried directly from source, cloning the git repo and running pip install "./prefect[kubernetes]"
but I get the same result...Dror Speiser
06/28/2021, 7:42 PMLeon Kozlowski
06/28/2021, 8:51 PMRob Fowler
06/29/2021, 5:16 AMJohn Berrisford
06/29/2021, 8:08 AMJoseph Ellis
06/29/2021, 11:56 AMMichael Law
06/29/2021, 1:22 PMTom Forbes
06/29/2021, 1:23 PMimport dask.dataframe as dd
dataframe = dd.read_parquet("<s3://foo/bar>")
dataframe = dataframe.apply(do_something_expensive, axis="columns")
dataframe.to_parquet("<s3://foo/bar2>").execute()
The question is: do Prefect mapping tasks help here? As I understand it a reduce task in Prefect cannot be done on each partition, so it needs the full set of inputs to be passed to it. If those inputs don’t fit in memory then it will fail? I must be misunderstanding something, because wouldn’t this limitation be quite… limiting?Tom Forbes
06/29/2021, 1:59 PMgateway = Gateway()
cluster = gateway.new_cluster()
executor = DaskExecutor(
address=cluster.scheduler_address,
client_kwargs={"security": cluster.security}
)
flow.run(executor=executor)
How is this supposed to work with flows that use Docker storage? The specific executor needs to be resolved at import time, and using the example code in the docs it would mean creating a cluster at import time.Yanina Libenson
06/29/2021, 2:53 PMTom Forbes
06/29/2021, 3:11 PMwith Flow() as flow:
s3_path = get_output_from_task(project="abc", flow="test-flow")
dataframe = read_data_from_s3(s3_path=s3_path)
I see this is kind of supported by directly using the graphql API, and maybe having some convention around terminal output tasks, but I was wondering if there was a better wayMahesh
06/29/2021, 3:38 PM@task
def get_values():
return ["value", "test", "demo"]
@task(task_run_name="{val}", state_handlers=[post_to_mail])
def compute(val):
if val == "demo":
raise ValueError("Nope!")
with Flow("task_run_names") as flow:
vals = get_values()
compute.map(vals)
I want to enble statehandlers for the tasks to notify the failed tasks with task_run_names,
Since it is looping tasks, am getting task name but not task_run_name.
Is there any prefect.context to get task_run_name or any way to get task_run_name in statehandlers.Jeremy Phelps
06/29/2021, 6:15 PMagent@prefect-agent:~$ prefect get logs --name granite-orca > log.log
Traceback (most recent call last):
File "/home/agent/.pyenv/versions/3.7.3/bin/prefect", line 10, in <module>
sys.exit(cli())
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/cli/get.py", line 430, in logs
result = Client().graphql(query)
File "/home/agent/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/client/client.py", line 319, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]
I'm not sure what's going on, I had no trouble until recently.
The Web UI can still retrieve logs, but it takes too long to scroll through them as it only keeps 100 entries in memory at a time.Brett Naul
06/29/2021, 6:24 PMrequests.exceptions.RetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: /graphql (Caused by ResponseError('too many 429 error responses'))
, maybe related to the above error?Brad
06/29/2021, 10:18 PMTypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got None
matta
06/29/2021, 10:19 PMConnor Campbell
06/29/2021, 11:23 PMtakan
06/30/2021, 2:17 AMjcozar
06/30/2021, 8:31 AMNikola Lusic
06/30/2021, 11:15 AMflow.run()
), the custom log handler works. However, when registering the flow to my localhost deployment of prefect server and agent (flow.register()
), the log handlers don't seem to do anything (although the logs are still visible in the Prefect UI).
In the following example, when running the flow locally, all logs are sent to our Logstash instance successfully. However, when registering to localhost server and running the flow, no logs are sent to the Logstash.
import prefect
from prefect import task, Flow
from prefect.run_configs import LocalRun
from prefect.utilities.logging import get_logger as get_prefect_logger
@task(name='Test Task')
def test_task():
logger = prefect.context.get("logger")
logger.error('TEST LOG 1 - context logger')
logger = get_custom_logger()
logger.addHandler(get_logstash_handler())
logger.error('TEST LOG 2 - custom logger')
logger = get_prefect_logger()
logger.addHandler(get_logstash_handler())
logger.error('TEST LOG 3 - util logger')
return
with Flow("test_flow") as flow:
test_task()
task_logger = get_prefect_logger('Test Task')
task_logger.addHandler(get_logstash_handler())
if __name__ == "__main__":
flow.run_config = LocalRun()
#flow.run()
#flow.register(project_name='localhost')
Is there something I'm missing in this setup?Matheus Cruz
06/30/2021, 12:33 PMJacob Baruch
06/30/2021, 1:27 PMBruno Murino
06/30/2021, 2:03 PMimport prefect
from prefect import task, Flow, Parameter, unmapped, case
from prefect.tasks.shell import ShellTask
shell_task = ShellTask(stream_output=True)
@task
def get_config():
return {
'path': '/root/',
}
with Flow("test") as flow:
config = get_config()
bar = shell_task(command="pwd", helper_script = f"cd {config['path']}")
flow.run()
Bruno Murino
06/30/2021, 2:04 PM[2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Finished task run for task with final state: 'Success'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: syntax error near unexpected token `newline'
[2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: `cd <Task: get_config['path']>'
[2021-06-30 14:03:32+0000] ERROR - prefect.ShellTask | Command failed with exit code 2
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Command failed with exit code 2')
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Failed'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Finished task run for task with final state: 'Success'
[2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Bruno Murino
06/30/2021, 2:04 PM[2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config': Finished task run for task with final state: 'Success'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: syntax error near unexpected token `newline'
[2021-06-30 14:03:32+0000] INFO - prefect.ShellTask | /tmp/prefect-if2skw8w: line 1: `cd <Task: get_config['path']>'
[2021-06-30 14:03:32+0000] ERROR - prefect.ShellTask | Command failed with exit code 2
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Command failed with exit code 2')
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'ShellTask': Finished task run for task with final state: 'Failed'
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Starting task run...
[2021-06-30 14:03:32+0000] INFO - prefect.TaskRunner | Task 'get_config['path']': Finished task run for task with final state: 'Success'
[2021-06-30 14:03:32+0000] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.