Joël Luijmes
09/27/2021, 9:25 AMScarlett King
09/27/2021, 3:34 PMfrom project_name.tasks import …I’m trying to deploy this by running
prefect register—project project_name -m “project_name.flows” But I’m getting error No module named ‘project_name’. What is the best way to do this? I’m trying to deploy the flow on Azure store
Alexander van Eck
09/27/2021, 5:14 PMdef cancel(obj: Union[Task, Flow], _: State, new_state: State) -> State:
if isinstance(new_state, (Cancelling, Cancelled)):
obj.some_arg.cancel() # let them know we're no longer waiting
return new_state
class RemoteTask(Task):
def __init__(self, *args: Any, **kwargs: Any) -> None:
state_handlers = kwargs.pop('state_handlers', [])
state_handlers.append(cancel)
kwargs['state_handlers'] = state_handlers
super().__init__(*args, **kwargs)
Aric Huang
09/27/2021, 5:32 PMserver
backend. Are there any recommended ways of handling server auth through the CLI, and/or is there a possibility of adding some auth options (e.g. basic auth, bearer token) to the CLI when using server
backend?Ismail Cenik
09/27/2021, 9:32 PMPayam Vaezi
09/28/2021, 1:32 PMAlfie
09/28/2021, 3:40 PMKevin Weiler
09/28/2021, 9:32 PMslack_notifier
function posts a status update for
presumably these 2 things are related.
I tried following this ticket and setting the PREFECT__SERVER__UI__ENDPOINT
env var in the environment where the UI container runs. I believe that is what is implied by:
[server]
[server.ui]
endpoint = "..."
In the ticket.
But that didn’t correct the issue. Do I need to set this env var in a different component - the graphql component perhaps?Fanglin
09/29/2021, 4:42 PM"unauthorized. invalid prefect cloud api key"
Any ideas why?Omer Sen
09/30/2021, 6:56 AMOmer Sen
09/30/2021, 6:56 AMOmer Sen
09/30/2021, 6:56 AMThe following error messages were provided by the GraphQL server:
INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at
"input.tenant_id"; Expected non-nullable type UUID! not to be null.
Omer Sen
09/30/2021, 6:56 AMFabrice Toussaint
09/30/2021, 7:32 AMDimas
09/30/2021, 8:00 AM[server]
host = "<http://mypublicIP>"
port = "4200"
host_port = "4200"
endpoint = "${server.host}:${server.port}"
[server.ui]
host = "<http://mypublicIP>"
port = "8080"
host_port = "8080"
host_ip = "mypublicIP"
endpoint = "${server.ui.host}:${server.ui.port}"
apollo_url = "<http://mypublicIP:4200/graphql>"
but some how, prefect come up with this err:
ERROR: for tmp_ui_1 Cannot start service ui: driver failed programming external connectivity on endpoint tmp_ui_1 (1f380ce2f089ba70a8c422d2302bb447fef87c48bdd81d2dd93898b661c661a0): Error starting userland proxy: listen tcp4 xx.xx.xx.xx:8080: bind: cannot assign requested address
but if i resetting the .toml file into local host. it's working with no error
can any one give me a clue, on how prefect UI running with my public IP:
this is my environment:
• Python 3.6.8
• Docker version 20.10.8, build 3967b7d
• docker-compose version 1.29.2, build unknown
• docker-py version: 5.0.2
• CPython version: 3.6.8
• OpenSSL version: OpenSSL 1.1.1g FIPS 21 Apr 2020
• prefect version: 0.15.6Alexander van Eck
09/30/2021, 1:21 PMVildMedPap
09/30/2021, 1:30 PMdocker build my_processor .
docker run -v ${PWD}/data:/app/data my_processor
Here the raw data (the csv file) is located in /data/raw and the processed file (the json file) will be located by the container in /data/processed.
What we wish to achieve
- 3 tasks
- Task #1) Fetch data from source (S3) and temporarily store it in /data/raw
- Task #2) Run the container with bind mount to /data directory
- Task #3) Push data from /data/processed to destination (S3)
Our setup
- The prefect server is hosted locally on an EC2 server
Dockerfile used to build the image
FROM python:3.7-slim
WORKDIR /app
COPY . .
RUN pip install .
CMD ["python3", "preprocess.py"]
Skeleton for our Flow flow.py
from prefect import Flow, task
...
@task(name="Get data")
def get_data_from_s3():
"""Retrieve csv from S3 and saves as /data/raw/file.csv"""
pass
@task(name="Process data")
def process_through_docker_image():
"""Start docker container which
- reads the csv-file
- processes the data
- writes as json
"""
pass
@task(name="Push data")
def upload_to_s3():
"""Push data to S3"""
pass
with Flow("foo") as f:
# First get data: get_data_from_s3()
...
# Then process the data: process_through_docker_image()
...
# Finally, upload the processed data to S3: upload_to_s3()
...
f.register()
Questions:
1. Is Prefect intended to be used this way?
2. Is it possible to use an already built Docker image in the way described above?
3. If so, how to do it? 😬William Burdett
09/30/2021, 4:14 PMHey everyone, I have an issue. We're getting this error: "Unexpected error: ClientError(\'An error occurred (ThrottlingException) when calling the DescribeTasks operation (reached max retries: 4): Rate exceeded\')"
I tried to bump the version of dask to 2021.9.1 and it is giving me back this error:
ImportError: cannot import name 'apply' from 'dask.compatibility' (/usr/local/lib/python3.9/site-packages/dask/compatibility.py)
I have no idea how to handle thisJulio Venegas
09/30/2021, 5:53 PMprefect server start --postgres-url postgres://<username>:<password>@hostname:<port>/<dbname>
because the @ in the username makes the parsing process think that AZURE_INSTANCE_NAME is the host. Escaping the @ in the username has not worked. In the past the quick fix someone from the Prefect team recommended to me that actually solved the issue, was to install https://github.com/PrefectHQ/server, pass the postgres connection string to the prefect-server config with export PREFECT_SERVER__DATABASE__CONNECTION_URL=CONNECTION_STRING and then run prefect-server database upgrade
. I’m currently doing 1) prefect server start --external-postgres --no-upgrade
to start the Docker containers, followed by prefect-server database upgrade
and the current error I’m getting from the last command is Error: HTTPConnectionPool(host='localhost', port=3000): Max retries exceeded with url: /v1/query (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6aaf865730>: Failed to establish a new connection: [Errno 111] Connection refused'))
which based on the port=3000 is related to Hasura. Any advice highly appreciated!!Sam Werbalowsky
09/30/2021, 8:34 PMPREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
with dask-gateway? We use a dynamic cluster per flow, rather than static, so we can’t fill in an exact address, as it depends on the cluster that has spun up as part of the flow running.
My goal is to have a helper file that configures the default executor and runconfig for a user so they don’t have to worry about their flow’s configuration.Nivi Mukka
09/30/2021, 10:37 PMdask-gateway==0.9.0
dask==2020.12.0
distributed==2020.12.0
prefect==0.14.1
click==7.1.2
Constantly seeing this warning on Dask worker logs after a data read from BigQuery. The data read from BigQuery is happening on 15 different workers but this warning shows up only on 2-3 workers and then the Prefect Flow takes about an hour to proceed from there. Any insight into how this can be resolved?Thomas Fredriksen
10/05/2021, 9:26 AMgevent
package. gevent
will patch the SSL-library and requires the following lines to be added before ssl
is imported in order to avoid an infinite import recursion:
import gevent.monkey
gevent.monkey.patch_all()
I am trying to deploy my flow to a Prefect Server instance, but it seems I am not able to monkey-patch early enough, as I am seeing this infinite recursion error in one of my tasks.
Is there any way of forcing the server to do this monkey-patch before executing the flow?Scarlett King
10/05/2021, 2:53 PMAlejandro A
10/05/2021, 5:10 PM2j
10/06/2021, 2:27 AMAZURE_STORAGE_CONNECTION_STRING
for access to blob storage. I'm not seeing anything baked into the helm values.yaml (though maybe something in prefectConfig
could work?). I was thinking of attaching a k8s secret after the helm deployment? But I think it needs to be set on the job that the agent kicks off. And I should be able to set the env var as-is, or in secret form like PREFECT__CONTEXT__SECRETS__AZURE_STORAGE_CONNECTION_STRING
?Denis Rogovskiy
10/06/2021, 11:18 AMAleksandr Glushko
10/06/2021, 3:03 PMgithub_repo/
├── flow.py
├── flow_dependencies/sub_task.py
how can one import flow_dependencies.saubtask
in flow.py
if i use flow.storage = Git(...)
TOMAS IGNACIO ACUÑA RUZ
10/06/2021, 3:10 PMScarlett King
10/06/2021, 3:25 PMTonin Gega
10/06/2021, 4:29 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/tonin'")
not really sure what i’m missing, that’s not a module in my flow. If I register my flow from within the VM the flow runs fine.Tonin Gega
10/06/2021, 4:29 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/tonin'")
not really sure what i’m missing, that’s not a module in my flow. If I register my flow from within the VM the flow runs fine.Kevin Kho
10/06/2021, 4:37 PMMarco Fontana
10/07/2021, 12:45 PMKevin Kho
10/07/2021, 1:56 PM