Aram Panasenco
03/03/2022, 6:16 PMIfeanyi Okwuchi
03/03/2022, 8:03 PMcreate_flow_run.map()
with wait_for_flow_run()
and get_task_run_result()
but I'm getting some errors. I'm not sure if I'm doing this correctlySeth Just
03/03/2022, 8:15 PMECSRun
with a LocalDaskExecutor
and I'm unable to find any way to debug or diagnose why the task isn't ever executed.Sam Werbalowsky
03/03/2022, 9:40 PMparam=Parameter("myvalue", default="myvalue")
. I want to us it in a variety of tasks say…
execute = execute_file(f"sql/{param}.sql")
upload = upload_file(f"{param}.csv")
Is there a way to do this without constructing a task for each input?YD
03/03/2022, 9:53 PMtask_1
return a df and I pass the df as an input to task_2
, does the df
physically go to the Prefect server ?
the issue I have is that it looks like it is getting stuck when trying to return the df and pass it to the next task.
is there a better way to pass df from one task to the next ?
@task()
def task_1():
df = ....
return df
@task()
def task_2(df):
df2 = df ....
return df2
def main():
with Flow("voc_sentiment") as flow:
df = task_1()
df2 = task_2(df)
Daniel Saxton
03/04/2022, 12:50 AM--env
flag, but is there also something like an --env-file
flag if you have several environment variables defined in a file (kind of like you have with docker-compose)?
https://docs.prefect.io/orchestration/getting-started/flow-configs.html#configure-environment-variablesMatt Alhonte
03/04/2022, 1:22 AMset_schedule_inactive
). It lets me read (I grab all the flows from a given project), but the mutate
gives me this error (the query works when I go to the Interactive tab on the GUI btw)
AuthorizationError([{'path': ['flow'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}])
Sen
03/04/2022, 5:30 AMhttps://pasteboard.co/kpsNsqpN3aXP.png▾
Thomas Opsomer
03/04/2022, 4:22 PMkevin
03/04/2022, 5:14 PMLing Chen
03/04/2022, 5:46 PMKen Nguyen
03/04/2022, 6:15 PMFROM ubuntu:18.04
COPY requirements.txt /requirements.txt
COPY google_secret.json $HOME/.config/gspread_pandas/google_secret.json
ENV PATH="/root/miniconda3/bin:$PATH"
ARG PATH="/root/miniconda3/bin:$PATH"
RUN apt-get update
RUN apt-get install -y wget && rm -rf /var/lib/apt/lists/*
RUN wget \
<https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh> \
&& mkdir /root/.conda \
&& bash Miniconda3-latest-Linux-x86_64.sh -b \
&& rm -f Miniconda3-latest-Linux-x86_64.sh
RUN conda install pip
RUN pip install -r requirements.txt
RUN conda install -c conda-forge theano-pymc -y
Ken Nguyen
03/04/2022, 6:39 PM@task
decorator on a function imported from a library (as opposed to one I defined)?Jack Chang
03/04/2022, 6:40 PMTypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Can anyone point me to an example where I can make it iterable?Hedgar
03/04/2022, 6:59 PMDaniel Saxton
03/04/2022, 7:06 PMLeon Kozlowski
03/04/2022, 8:40 PMHenry
03/04/2022, 8:55 PMinput_a = Parameter('a')
@task
def abc(b, c):
return a + b + c
with Flow("abc_flow") as flow:
# in context
result = abc()
Jared Teerlink
03/04/2022, 9:21 PMMartha Edwards
03/04/2022, 9:58 PMKevin Kho
Kevin Kho
Hans Lellelid
03/05/2022, 8:39 PMDaniel Ross
03/05/2022, 8:50 PMrequests.exceptions.ConnectionError: HTTPConnectionPool(host='127.0.0.1', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f1911c36d90>: Failed to establish a new connection: [Errno 111] Connection refused'))
If I look at the task itself, I can see that the environment variable for PREFECT__CLOUD__API is set to http://127.0.0.1:4200. So this seems like the problem.
The host it's trying to connect to is clearly wrong (since the server itself is running on an EC2 instance). So I've adjusted my ~/.prefect/config.toml to look like this:
host_ip = "my.ip.goes.here"
host_port = "4200"
host = "http://${server.host_ip}"
port = "4200"
endpoint = "${server.host}:${server.port}"
[server.ui]
apollo_url = "<http://my.ip.goes.here:4200/graphql>"
[cloud]
api = "${${backend}.endpoint}"
endpoint = "<https://api.prefect.io>"
graphql = "${cloud.api}/graphql"
No luck.
So I added the PREFECT__CLOUD__API definition to my environment variables in the container definition. Still no luck. However, when I look at the task definition, I can see the correct (or at least intended) PREFECT__CLOUD__API environment variable there. But the variable in the task is still set to http://127.0.0.1:4200, and the problem persists!
I am pretty stuck on this, and hoping that someone here has a line of sight to the solution. (This all worked without much configuration previously ... which now seems weird.)
All help appreciated!Jacqueline Riley Garrahan
03/06/2022, 6:06 AMapiVersion: batch/v1
kind: Job
metadata:
name: prefect-job
labels: {}
spec:
template:
metadata:
labels: {}
spec:
containers:
- name: flow
image: prefecthq/prefect:latest
imagePullPolicy: IfNotPresent
...
volumeMounts:
- name: filesystem-dir
mountPath: /job-files
volumes:
- name: filesystem-dir
hostPath:
# directory location on host
path: /path/to/my/dir
# this field is optional
type: Directory
I'm unable to mount the path which I believe is a function of the volume not being available on the agent. Is there a straightforward way for me to mount a volume to the agent in the helm chart deployment?Khen Price
03/06/2022, 10:43 AMDo mapped tasks run concurrently? Or does concurrency only happen when a dask executor is used?
Thanks!Dekel R
03/06/2022, 8:43 PMcomparable_items_df['tag_rank'] = comparable_items_df.groupby(['id', 'tag_name']).cumcount()
And I get the following error (when running outside of a task context it works)
TypeError: unhashable type: 'ResultSet'
For now I’m running this locally on my mac but it will run eventually using Vertex AI.
I’ll appreciate any help since I’m currently stuck.
Thanks.Bihag Kashikar
03/07/2022, 12:55 AMSen
03/07/2022, 12:39 PMEmma Rizzi
03/07/2022, 1:08 PM