Nuno
09/21/2020, 9:44 AMtask
doesn’t seem to work for class properties. Here is the error:
File "/Users/nuno/Developer/Data-Framework/data-prefect/data_prefect/utils/flows.py", line 67, in factory_flow
flow.fetch()
File "/Users/nuno/Developer/Data-Framework/data-prefect/.venv/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
new.bind(
File "/Users/nuno/Developer/Data-Framework/data-prefect/.venv/lib/python3.8/site-packages/prefect/core/task.py", line 511, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/inspect.py", line 3025, in bind
return self._bind(args, kwargs)
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/inspect.py", line 2940, in _bind
raise TypeError(msg) from None
TypeError: missing a required argument: 'self'
It seems that I cannot pass the method “self” argument.
Do you guys have any suggestion? Thank you in advance.sark
09/21/2020, 10:55 AMdef get_flow_run_state(client, flow_run_id):
q = parse_graphql(
{'query': {
with_args('flow_run',
{'where': {'id': {'_eq': flow_run_id}}}): {
'state'
}
}
})
state = client.graphql(q).data.flow_run[0].state
return state
def wait_flow_complete(flow_run_id):
client = Client()
state = None
while state != 'Success':
sleep(10)
state = get_flow_run_state(client, flow_run_id)
question: is it possible to avoid the polling and achieve the same thing?Nuno Silva
09/21/2020, 12:07 PMDaskKubernetesEnvironment
how to setup the k8s namespace in which we want the job to run in the cluster?
I'm using scheduler_spec_file/worker_spec_file
and in the yaml files I set the metadata:namespace: <name>
. But for the job itself it starts in the default
namespaceNuno Silva
09/21/2020, 1:08 PMimage_pull_secret
and I've tried with setting it in the custom scheduler/worker yaml files and also as an argument in DaskKubernetesEnvironment
. Both fail. Looking the cluter error it gives is:
Failed to pull image "<image_url>": rpc error: code = Unknown desc = Error response from daemon: Get <image_url>: unauthorized: authentication required, visit <https://aka.ms/acr/authorization> for more information.
Looking into the yaml of the job submitted to the cluster (Pods->Actions->Edit) it is clear that there is no field as bellow:
imagePullSecrets:
- name: regcred
Is it a bug or I'm doing something wrong? Thank youMarek Nguyen
09/21/2020, 2:02 PMJohnny
09/21/2020, 6:25 PMEric
09/21/2020, 11:33 PMPedro Machado
09/22/2020, 3:29 AMstart_date
and end_date
. These can come from two Parameters
or, if the parameters are not provided, the dates are computed after applying some logic to prefect.context.scheduled_start_time
.
I'd like to use the computed start and end dates for templating the task result's location. If these dates are passed as task inputs, they are accesible (I can do this location="{start_date}_{end_date}.txt"
) but I'd also like to use these variables in some downstream tasks that don't list them as inputs.
Is there another way for a downstream task to access these? Since they don't always come from a Parameter, they are not available in prefect.context.paramters
. I tried adding them to the context at run time with prefect.context["start_date"] = start_dt
but modifying the context like this doesn't feel right. Any suggestions?Pedro Machado
09/22/2020, 4:04 AMStringFormatter
task with a template that is read at run time?sark
09/22/2020, 8:53 AM$ prefect agent start docker -l gcs-flow-storage --volume /var/run/docker.sock:/var/run/docker.sock --volume ~/.config:/root/.config --volume ~/.docker:/root/.docker
but i am still getting
Unexpected error: ImageNotFound(HTTPError('404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create'>))
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/docker/api/client.py", line 259, in _raise_for_status
response.raise_for_status()
File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 941, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.40/containers/create>
Nejc Vesel
09/22/2020, 9:00 AMflow_run_id
or flow_run_name
(i.e. target='{flow_run_id}/foo'
).
What I have trouble with is designing a method that works both when using Prefect Server and when using Prefect through the basic Python.
For example, I am able to set the flow_run_id
when running through the Python API with flow.run(flow_run_id=<my flow run id>)
, however setting flow_run_id
seems to be impossible when running through the Prefect server. Inversly, one can set flow_run_name
when running through the Prefect Server but the concept of a flow_run_name
doesn't seem to exist in the Python API.
What would be the best way to resolve this issue?
Thank you very much,
NejcSImon Hawe
09/22/2020, 9:18 AMLewis Bails
09/22/2020, 9:54 AMflow.environment = LocalEnvironment(
executor=DaskExecutor(
adapt_kwargs={
"minimum": 1,
"maximum": 5,
},
),
)
FWIW, I'm using prefect 0.13.2, dask 2.26.0, distributed 2.26.0orcaman
09/22/2020, 10:32 AMMikael
09/22/2020, 10:49 AMVincent
09/22/2020, 12:52 PMGreg Roche
09/22/2020, 2:35 PM|--project
|-.venv
|-etl
|--__init__.py
|--foo
| |--__init__.py
| |--foo_flow.py
|--bar
| |--__init__.py
| |--bar_flow.py
|--shared_utils
|--__init__.py
|--utils.py
In foo_flow.py
and bar_flow.py
I import modules from shared_utils
by appending the etl folder to sys.path before importing utils.py
. When I run any of these flows with flow.run()
they work fine.
However, when I do flow.register()
instead and then start a local agent, from any of the directories listed above, all flow runs initiated from the server to the agent fail instantly with Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'shared_utils'"
. According to a stackoverflow answer this is because the agent's python path doesn't include project/etl
but it still fails when I run the agent with --import-path "C:\project\etl"
. I've tried registering the flow from every directory listed above, and tried starting the agent from every directory listed above, and also tried passing every directory listed above as an --import-project
argument, and I get exactly the same error every time. Can anybody please point out what I'm missing?Pedro Machado
09/22/2020, 3:15 PMtarget
and PandasSerializer
to write the csv.
I am wondering what the recommended pattern is for the downstream SFTP task to locate the csv. If I were not relying on the LocalResult
to save the file, I'd simply return the path to the local file from the function that generates the csv. However, since I am using the Result
mechanism, this task just returns a dataframe.
Do I just have to rely on the templated target
location for downstream tasks to find the csv file? In other words, do I have to use the same template and render it manually in the SFTP task?EmGarr
09/22/2020, 4:19 PMimport prefect
from prefect import Flow, task
@task
def build_job():
return {'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
@task(log_stdout=True)
def run_step(step):
print(step)
with Flow('test') as flow:
config = build_job()
tmp_task = None
for step_name in ['a', 'b', 'c']:
upstream_tasks = None if tmp_task is None else [tmp_task]
tmp_task = run_step(
config['steps'][step_name], upstream_tasks=upstream_tasks
)
flow.run()
2. Use loop to be more dynamic
import prefect
from prefect.engine.signals import LOOP
from prefect import Flow, task
@task
def build_job():
return {'order': ['a', 'b', 'c'], 'steps': {'a': 'a', 'b': 'b', 'c': 'c'}}
@task(log_stdout=True)
def run_step(job_file):
# we extract the accumulated task loop result from context
loop_payload = prefect.context.get("task_loop_result", {})
step_name = loop_payload.get("step_name", job_file['order'][0])
step = job_file['steps'][step_name]
print(step)
pos = job_file['order'].index(step_name) + 1
if pos < len(job_file['order']):
next_step = job_file['order'][pos]
raise LOOP(message=f"Fib {step_name}", result=dict(step_name=next_step))
with Flow('test') as flow:
config = build_job()
run_step(config)
Is there a plan to support something like ?
with Flow('test') as flow:
config = build_job()
tmp_task = None
for name in config['order']:
config['steps'][name]
Thanks !abhilash.kr
09/22/2020, 4:39 PMMarvin
09/22/2020, 4:39 PMKevin Weiler
09/22/2020, 8:27 PMEric
09/22/2020, 9:12 PMBerty
09/22/2020, 10:49 PMMarvin
09/22/2020, 10:49 PMSpencer
09/23/2020, 2:43 AMMarvin
09/23/2020, 2:43 AMjosh
09/23/2020, 11:23 AM0.13.8
has been released and here are a few notable changes:
🐳 Made Docker storage more flexible
⏰ Added more schedule filters (start of month, day of week)
👋 Deprecated /contrib
and moved those tasks into the full task library
🤔 Fixed some perplexing issues
A big thank you to our contributors who helped out with this release! Full changelog:Dinu Gherman
09/23/2020, 11:47 AMHassan Javeed
09/23/2020, 11:54 AMUnexpected error raised during flow run:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 239, in run
parameters=parameters,
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/flow_runner.py", line 184, in initialize_run
task = tasks[task_run.task_slug]
KeyError: '7d9e37de-16ab-4664-a688-421da03bf1b2'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 162, in handle_state_change
new_state = self.call_runner_target_handlers(old_state, new_state)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/flow_runner.py", line 126, in call_runner_target_handlers
prefect.context.update(flow_run_version=version + 1)
TypeError: unsupported operand type(s) for +: 'NoneType' and 'int'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/environments/execution/k8s/job.py", line 179, in run_flow
runner_cls(flow=flow).run(executor=executor_cls)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 273, in run
state = self.handle_state_change(state or Pending(), new_state)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 184, in handle_state_change
raise ENDRUN(Failed(msg, result=exc))
prefect.engine.signals.ENDRUN