Michelle Wu
11/30/2020, 2:08 AMdask-resource
in tags
as argument for @task
:
running_server = "name_of_remote_server"
resource_tag = "dask-resource:{}=1".format(running_server)
@task(log_stdout=True, state_handlers=[email_on_failure], tags=[resource_tag])
def test_task():
print(1/0)
When I started the remote dask-worker, I used command like this:
dask-worker tcp://<address of local dask-scheduler>:8786 --nprocs 4 --nthreads 1 --worker-port xxx --resources "name_of_remote_server=1"
This connected local scheduler with remote worker perfectly. However, when I actually started the flow on local machine, it failed running on the remote worker first because of ModuleNotFoundError: No module named 'prefect'
. After I installed prefect for the remote worker, another error occurred on it:
[2020-11-24 17:02:05+0800] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6099fcf10>: Failed to establish a new connection: [Errno 111] Connection refused'))"))
Traceback (most recent call last):
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection
raise err
File "/anaconda3_a/envs/py37/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
...
2020-11-24 17:02:05+0800] INFO - prefect.CloudTaskRunner | Task 'test_task': Finished task run for task with final state: 'ClientFailed'
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Failed to write log with error: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff6097590d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
[2020-11-24 09:02:10+0000] CRITICAL - CloudHandler | Unable to write logs to Prefect Cloud
I tried opening port 4200 of local scheduler for the remote worker, but the same error occurred anyway. Wondering what it is that I’ve been doing wrong? 😶Klemen Strojan
11/30/2020, 9:31 AMEric
11/30/2020, 9:53 AMprefect.utilities.exceptions.ClientError: 400 Client Error: Bad Request for url: http://<Prefect Cloud IP>:4200
This is likely caused by a poorly formatted GraphQL query or mutation. GraphQL sent:
query {
query { auth_info { api_token_scope } }
}
variables {
null
}
Is there any missing step when I set Prefect agent? Thank you very much!Jonas Hanfland
11/30/2020, 10:37 AMpending
indefinitely. Not even the timeout provided to @task
gets triggered.
When checking the logs for the mapped task in question, I found what seems to be an internal server error (see thread).
Rerunning it multiple times in the past resulted in the same issue (with the same exception), except this Saturday when it miraculously succeeded but then failed again the next day.
Is that exception the cause of the problem? Is the issue on prefect's side or on mine? Thanks in advanceemre
11/30/2020, 10:55 AMflow.run(executor=LocalDaskExecutor(scheduler="threads", num_workers=4))
and can execute tasks in parallel. The server registration is as follows:
flow.storage = Docker(....)
flow.run_config = DockerRun()
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
flow.register(project_name=... , build=False)
The flow can run and communicate with prefect server, but the tasks are executed sequentially. Both the gannt chart and the logs show this.
Am I missing something? (prefect==0.13.13)Ben
11/30/2020, 2:16 PMrun_config
then.
Is that use case normal or am I thinking about it wrong? Just to double check that I'm not attempting anything totally unidiomatic here.matta
11/30/2020, 5:22 PMmatta
11/30/2020, 5:22 PMdbt_task = DbtShellTask(
...
)
with Flow("dbt-flow") as flow:
dbt_command_string = Parameter('dbt_command_string', default = 'dbt run', required = True)
dbt_commands = dbt_command_string.split(',')
for command in dbt_commands:
dbt_task(
command=dbt_command,
...
)
matta
11/30/2020, 5:23 PMmap
that forces it to run serially?matta
11/30/2020, 5:23 PMmatta
11/30/2020, 5:23 PMmatta
11/30/2020, 5:23 PMJohn Grubb
11/30/2020, 7:25 PMUnexpected error while running flow: KeyError('Task slug foo-3 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
It's always the same task for both pipelines - I've split out a common task to load data to BigQuery into another file and this is the task that fails. It will run fine for a few days and then randomly decide that it can't find this task, even though the exact same task is called two other times during each pipeline, foo-1 and foo-2. I'm wondering if I'm the first person to have their install randomly lose tasks..josh
11/30/2020, 8:45 PM0.13.18
has been released and here are a few notable changes:
⁉️ Formatted GraphQL errors are now raised by the Client
👟 Refactored a few Client API calls for performance
📚 Various enhancements and fixes to tasks in the Task Library
A big thank you to our contributors who helped out with this release! Full changelog https://github.com/PrefectHQ/prefect/releases/tag/0.13.18Sam Luen-English
11/30/2020, 9:40 PMFROM python:3.8
RUN python -m pip install sqlalchemy prefect
main.py
from prefect import Flow, task
from prefect.environments.storage import Docker
import sqlalchemy
@task
def some_task():
print(sqlalchemy.__version__)
with Flow("Some Flow") as flow:
some_task()
if __name__ == "__main__":
flow.storage = Docker(
registry_url="<http://docker.io|docker.io>",
image_name="storage_image",
image_tag="latest",
base_image="test-base",
local_image=True,
)
flow.register(project_name="Development")
Test script:
#!/usr/bin/env bash
set -x
docker build . -t test-base
#: Approach 1
python main.py
#: Approach 2
docker run -v /var/run/docker.sock:/var/run/docker.sock -v ~/.docker:/root/.docker -e PREFECT__CLOUD__AUTH_TOKEN="$PREFECT__CLOUD__AUTH_TOKEN" -v $(pwd)/main.py:/main.py test-base bash -c 'python /main.py'
Please could you guide me with the best approach?
Many thanks!Pedro Machado
12/01/2020, 5:09 AMstart_date
parameter that is not required. When the parameter is passed, we validate it and use it in the flow. When it's not passed, we use the flow's scheduled_start_time
to derive the start date. This complicates the flow a little bit. I normally have to create a downstream task that takes the Parameter
result and applies this logic.
I was thinking that it would be nice to be able to pass a callable as the default
argument of the Parameter
class to provide a value if the parameter is missing. Similarly, it would be nice to have a validator
callable that could be used to validate the parameter value.
Is there another way to accomplish this? Thoughts?Eric
12/01/2020, 6:55 AMSam Luen-English
12/01/2020, 1:03 PMfrom prefect import Flow, task
from prefect.environments.storage import Docker
import sqlalchemy
@task
def some_task():
print(sqlalchemy.__version__)
with Flow("test") as flow:
some_task()
if __name__ == "__main__":
flow.storage = Docker(
registry_url="<http://docker.io/atheon|docker.io/atheon>",
image_name="prefect-test",
base_image="atheon/prefect-test:base",
)
flow.register(project_name="Development")
karteekaddanki
12/01/2020, 3:05 PM---> Running in 26ae91f84100
Beginning health checks...
System Version check: OK
Cloudpickle serialization check: OK
Result check: OK
Environment dependency check: OK
All health checks passed.
Removing intermediate container 26ae91f84100
---> a63da9840884
Traceback (most recent call last):
File "register.py", line 60, in <module>
storage.build()
File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 359, in build
self._build_image(push=push)
File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 423, in _build_image
self._parse_generator_output(output)
File "/home/karteek/anaconda3/envs/prefect/lib/python3.6/site-packages/prefect/environments/storage/docker.py", line 661, in _parse_generator_output
or parsed.get("errorDetail", {}).get("message")
AttributeError: 'NoneType' object has no attribute 'strip'
Any thoughts on what might be happening here? I can confirm that the docker image is building without any issues and the logs show that all the health checks are passing. I am using python 3.6Charles Lariviere
12/01/2020, 3:41 PMflow.run()
(instead of flow.register()
) and running with a local-agent, everything works great. Though, when I run the following script to register it in our Prefect Cloud account and execute it through the UI, it works (as in, the query gets executed) but I get the following error: Unexpected error: TypeError("can't pickle _thread.lock objects")
.
My question is; what are best practices for structuring these scripts? Once you’ve built the flow, should you always end it with flow.run()
and flow.register()
? Do you manually register flows when making changes but not include the register()
method inside the actual script?Sean Talia
12/01/2020, 5:26 PMAndrey Tatarinov
12/01/2020, 6:06 PMAndrew Hannigan
12/01/2020, 8:24 PMZach
12/01/2020, 9:38 PMPedro Machado
12/02/2020, 1:29 AMSlackbot
12/02/2020, 11:54 AMAlex Koay
12/02/2020, 1:29 PMRodrigo Ceballos
12/02/2020, 2:49 PMIason Andriopoulos
12/02/2020, 2:52 PMAndrey Tatarinov
12/02/2020, 3:49 PMAndrey Tatarinov
12/02/2020, 3:49 PMAndrew Hannigan
12/02/2020, 5:14 PMAndrey Tatarinov
12/02/2020, 6:07 PMAndrew Hannigan
12/02/2020, 6:51 PMAndrey Tatarinov
12/02/2020, 7:44 PMdef make_flow(Result)
which accepts Result constructor.Andrew Hannigan
12/02/2020, 8:03 PM