Hui Zheng
10/23/2020, 11:35 PMFailed to set task state with error: HTTPError('400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 124, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1399, in set_task_run_state
result = self.graphql(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 275, in graphql
result = <http://self.post|self.post>(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 230, in post
response = self._request(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 400, in _request
response = self._send_request(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 333, in _send_request
response.raise_for_status()
File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 941, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>
Is it a prefect.io cloud API error? Anything I could do to avoid it or re-try it in a better way?Jeff Friesen
10/24/2020, 1:38 AMJeff Friesen
10/24/2020, 1:40 AMScott Asher
10/24/2020, 4:23 AMexecute() missing 1 required positional argument: 'flow_location'
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 11, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 34, in flow_run
return _execute_flow_run()
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 99, in _execute_flow_run
raise exc
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 93, in _execute_flow_run
environment.execute(flow)
TypeError: execute() missing 1 required positional argument: 'flow_location'
[2020-10-24 04:14:03,330] INFO - agent | Process PID 23114 returned non-zero exit code
This error confuses me, primarily because I don’t see my custom executor anywhere in the stack, nor do I understand what was supposed to be passing around location.
The code runs fine when run locally using `flow.run()`:
[2020-10-24 04:01:44] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow'
[2020-10-24 04:01:44] INFO - prefect.TaskRunner | Task 'random_number': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'random_number': finished task run for task with final state: 'Success'
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': finished task run for task with final state: 'Success'
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': finished task run for task with final state: 'Success'
[2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': Starting task run...
[2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': finished task run for task with final state: 'Success'
[2020-10-24 04:01:50] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Code is (imports removed to save space):
@task(name="random_number")
def random_number():
time.sleep(5)
return random.randint(0, 100)
@task(name="random_number")
def random_number():
time.sleep(5)
return random.randint(0, 100)
@task(name="plus_one")
def plus_one(x):
return x + 1
@task(name="plus_two")
def plus_two(x):
return x + 2
@task(name="plus_three")
def plus_three(x):
return x + 3
#with Flow('test_flow', environment=LocalEnvironment(executor=PipelineExecutor())) as flow:
flow = Flow('test_flow', storage=Local(), environment=PipelineEnvironment())
plus_three.bind(plus_one, flow=flow)
plus_two.set_upstream(plus_one, flow=flow)
plus_two.bind(plus_one, flow=flow)
plus_one.set_upstream(random_number, flow=flow)
plus_one.bind(random_number, flow=flow)
Scott Asher
10/24/2020, 4:24 AMxxxx@xxxx:~/xxxx/xxxxxxxx$%>) % prefect register flow --project="Daily DAG" --file /tmp/testprefect.py
Scott Asher
10/24/2020, 4:25 AMale
10/24/2020, 10:49 AMconfig.toml
?Darragh
10/24/2020, 3:10 PMPREFECT__SERVER__UI__GRAPHQL_URL
Not working with 0.13.12
Docs seem to suggest that it’s changed graphql_url
to apollo_url
, so I tried to update the env var to PREFECT__SERVER__UI__APOLLO_URL
or even PREFECT__SERVER__SERVER__UI__APOLLO_URL
but no luck.
Any suggestions? Also tried PREFECT__TELEMETRY__SERVER__TELEMETRY__ENABLED=false
to disable telemetry but no luck there eitherMarley
10/24/2020, 8:49 PMAnish Chhaparwal
10/24/2020, 11:34 PMimport argparse
from prefect import Flow, Parameter, task
import prefect
from prefect.environments import LocalEnvironment
@task
def testing(args):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"agrs value for name is : {args.name}")
<http://logger.info|logger.info>(f"agrs value for worker is : {args.workers}")
with Flow("args_trial") as flow:
opts = Parameter("opts")
testing(args=(opts))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="test pywinrm")
parser.add_argument("--workers", type=int, default=10)
parser.add_argument("--sleep_time", type=int, default=2)
parser.add_argument("--name", type=str, default="args_tester")
opts = parser.parse_args()
# flow.run(opts=opts)
flow.environment = LocalEnvironment(labels=["qure9", "pipelines"])
flow.register(project_name="Test")
I can seem to figure out how to pass the default args while registering.
Also, if i have to convert each of them into prefect parameters, i'd like to pass all parameters to a function using something like def func_1(args) instead of specifying 15-20 to each function. is that possible?Scott Asher
10/25/2020, 9:03 PM__main__
(moved code blocks into thread)Narasimhan Ramaswamy
10/25/2020, 10:04 PMfrom prefect import task, case, apply_map,Flow,unmapped
import prefect
from prefect.tasks.control_flow import merge
from prefect.tasks.shell import ShellTask
from prefect.engine.results import LocalResult
from prefect.tasks.control_flow import case
from prefect.environments.storage import Docker
import docker
def test2(name):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Inside Task 2")
a = ShellTask(name=name, command="ls", return_all=True, log_stderr=True)
return a
@task(log_stdout=True)
def inc_if_even(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("----Start----")
name = r'Shell' + str(x)
b = test2(name).run()
<http://logger.info|logger.info>(f"{b}")
<http://logger.info|logger.info>("----End----")
return 1
@task
def reduce(x):
z = sum(x)
if z > 1:
return True
else:
return False
@task
def return_num(x):
return x + 10
@task
def logger_st(status):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Task Failed - {status}")
return 1
from prefect.engine.executors import DaskExecutor,LocalDaskExecutor
from prefect.environments import LocalEnvironment
with Flow("test-flow",environment=LocalEnvironment(executor=DaskExecutor())) as flow:
logger = prefect.context.get("logger")
return_nums = return_num.map([1,2,3,4,5,6,7])
result = inc_if_even.map(return_nums)
reduced = reduce(result)
Hi All, just a quick question on DaskExecutor() - when running using flow.run, the mapped tasks works in parallel. but when running on cloud, it runs in sequence. My setup is agent running on Kubernetes. can you please help on what i should be doing?Scott Asher
10/26/2020, 1:45 AMFile "/usr/scratch/sasher/pyenvs/prefect/lib/python3.6/site-packages/prefect/utilities/storage.py", line 85, in extract_flow_from_file
raise ValueError("No flow found in file.")
ValueError: No flow found in file.
Scott Asher
10/26/2020, 3:47 AMPrefect’s imperative API allows more fine-grained control. Its main advantage over the functional API is that it allows tasks to be set as upstream or downstream dependencies without passing their results. This allows you to create a strict ordering of tasks through state dependencies without also creating data dependencies.And then you give an example where you force a “PlusOne” task to run after a task that simply prints something. My question is - what if you WANT a data dependency? Can you do this through the imperative API? More code blocks to follow in thread so I don’t clutter up the main thread.
Leon Hao
10/26/2020, 8:04 AMexecution_date
is not interpreted by Airflow as the start time of the DAG, but rather the end of an interval capped by the DAG’s start time.
I believe execution_date in Airflow is the start time of the schedule interval?Hagai Arad
10/26/2020, 9:49 AMtsar
10/26/2020, 10:10 AMERROR: Invalid interpolation format for "apollo" option in service "services": "${GRAPHQL_HOST_PORT:-4201}"
Exception caught; killing services (press ctrl-C to force)
ERROR: Invalid interpolation format for "apollo" option in service "services": "${GRAPHQL_HOST_PORT:-4201}"
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/prefect/cli/server.py", line 332, in start
["docker-compose", "pull"], cwd=compose_dir_path, env=env
File "/usr/lib64/python3.6/subprocess.py", line 311, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['docker-compose', 'pull']' returned non-zero exit status 1.
banditelol
10/26/2020, 11:29 AMflow.run()
to run it locally and change it to flow.register()
when I'm done.
2. Push the changes to git repo
3. Pull it from the server
4. Run the modified/created flow so that it's registered on Prefect Server (btw I have one agent running on the background in the server)
5. Activate the Flow from the UI
I feel that there's clearly better way to do this, but I haven't found anything yet from googling. I really appreciate if there are anyone that could help give any clue for this.
Thanks 🙂Ralph Willgoss
10/26/2020, 2:32 PMMarwan Sarieddine
10/26/2020, 2:38 PMUnexpected error: AttributeError("'NoneType' object has no attribute 'is_finished'")
Marwan Sarieddine
10/26/2020, 3:36 PMtsar
10/26/2020, 3:57 PMMarley
10/26/2020, 4:33 PMtrigger=all_successful
), and one that only notifies if something failed (trigger=any_failed
). I’ve added a custom state handler to trigger failed (see code block in thread) that should SKIP
if TriggerFailed
. Raising that SKIP
is causing the Task to fail in Prefect Cloud. I previously tried to leverage the Flow on_failure
but for some reason it wasn’t sending my notifications. Am I missing something re: raising a SKIP
signal on the last Task? Is there something special happening in a Flow’s on_failure
preventing it from sending a Slack notification?Nuno
10/26/2020, 4:39 PMIsaac Brodsky
10/26/2020, 7:12 PMUnexpected error: KeyError('lz4')
? Seems the flow itself rather than a task is failing. This is using Docker
storage, `LocalEnvironment`/`DaskExecutor` with Dask running on Kubernetes. Seems like somehow lz4
is not present where the job is started? I do install pyarrow
using python_dependencies
in the Docker
storage so I’d expect lz4
to be there. I’m not sure where else lz4
could be missing.james.lamb
10/26/2020, 10:31 PMprefect
source code. help(LocalResult)
shows the following:
LocalResult(dir: str = None, validate_dir: bool = True, **kwargs: Any) -> None
Result that is written to and retrieved from the local file system.Is it fair to say that "local" in this case means "local to where the task is physically run" and not "local to wherever
flow.run()
is called from"?wesley gabriel
10/27/2020, 12:33 AMDean Magee
10/27/2020, 3:36 AMMatt Drago
10/27/2020, 7:38 AMStates
and Caching and Persisting Data
in the doco. Would I be able to store the position that the extract got up to in one of these things, or is the Result
object the right thing to use and then interrogate the Result of the last successful run of the Task?John Grubb
10/27/2020, 12:37 PM