Ian Singer
01/06/2022, 3:39 PMEric Richard
01/06/2022, 3:45 PMCarrie Bohinc
01/06/2022, 4:33 PMVaibhav Shetye
01/06/2022, 5:55 PMJason Noxon
01/06/2022, 6:57 PMGui Pires
01/06/2022, 7:24 PMIsaac Brodsky
01/06/2022, 7:36 PMRuntimeError: Task <Task pending name='Task-2051' coro=<FutureState.wait() running at /usr/local/lib/python3.8/site-packages/distributed/client.py:482> cb=[WaitIterator._done_callback()]> got Future <Future pending> attached to a different loop
As far as I can tell I did not change anything about how work was submitted to Dask so I am wondering if this is some intermittent issue about where the Prefect task is running? My Prefect task is wrapped in with worker_client()
Danny Vilela
01/06/2022, 8:24 PMFoo
that maps over a task input (list
) with 3 values. Sometimes, though, some of those values aren’t applicable (for example, if it’s a requested date that doesn’t exist in our database), so I thought of raising prefect.engine.signals.SKIP
to note that (1) this dynamic task input doesn’t apply but also that (2) it wasn’t a failure, per se.
That said, I’m noticing that a task Bar
that is directly downstream of Foo
is also skipping because when Foo
skips any of its mapped tasks. It seems this is intended, but is there a trigger I should raise to note that “it’s fine if any number of these mapped tasks fails”? Bar
has other upstream tasks but I wouldn’t want those to be considered.
Does skip_on_upstream_skip
apply here? Should I configure Bar
such that skip_on_upstream_skip=False
? From the docs here.John Jacoby
01/06/2022, 8:48 PMwith Flow(constants['name'], result=result) as flow:
John Jacoby
01/06/2022, 8:49 PMJohn Jacoby
01/06/2022, 8:49 PM@task(target='{task_name}/{scan_id}', checkpoint=True, nout=2)
def bourget2bids(participant_id: str, scan_id: str, study_constants):
tmp_BIDS_dir, bourget_path = get_from_bourget(participant_id, scan_id, study_constants['name'], study_constants['scripts_dir'])
add_ASL_metadata(scan_id, study_constants['name'])
BIDS_path = copy_to_study_data_folder(participant_id, scan_id, tmp_BIDS_dir, study_constants['name'], study_constants['data_dir'])
return bourget_path, BIDS_pathJohn Jacoby
01/06/2022, 8:49 PMJohn Jacoby
01/06/2022, 8:49 PMJohn Jacoby
01/06/2022, 8:50 PMDanny Vilela
01/06/2022, 8:54 PMprefect.engine.signals.SKIP
, does it need to return a SKIP
as its result? Can I configure the (mapped or mapper) task such that it only returns a result (say, a string) if the task did not skip? Or [does/should] it have to return a SKIP
?KhTan
01/06/2022, 9:40 PMKhTan
01/06/2022, 11:58 PMEnda Peng
01/07/2022, 2:40 AMShivam Bhatia
01/07/2022, 10:07 AMGabriel Milan
01/07/2022, 12:54 PMrequests.exceptions.ConnectionError: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7efe788176d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
But the weird thing is that if I open a terminal inside the same job pod and try to curl prefect-apollo.prefect:4200
, I can successfully get an answer from the apollo server.
Has anyone had anything similar before? Or tried Istio with Prefect on Kubernetes?Dave
01/07/2022, 2:21 PMJason Motley
01/07/2022, 4:13 PMYusuf Khan
01/07/2022, 5:25 PMself
?Danny Vilela
01/07/2022, 9:06 PMmapped(flatten(nested_list_data))
to a task?Greg Adams
01/07/2022, 9:14 PMAshton
01/08/2022, 1:36 AMStephen Herron
01/08/2022, 7:00 AMwith Flow(FLOW_NAME) as flow:
event_dates = get_event_dates()
s3_keys = generate_s3_key.map(suffix=event_dates)
event_file_data = unload_data_to_s3.map(s3_keys, event_dates)
update_log = update_log.map(event_dates, upstream_tasks=[event_file_data])
update_snowflake = update_snowflake.map(s3_keys, event_dates, upstream_tasks=[update_log])
The problem is when I schedule this in cloud (local agent) even though all the mapped tasks complete the run doesn’t seem to terminate. Am I missing something?Aqib Fayyaz
01/08/2022, 1:54 PMUnable to locate package openjdk-8-jdk
. Is the issue is because of base image, for other docker files where spark run we have ubuntu 20.04 as base image but for prefect we have prefect as base image. Below is the docker file
FROM prefecthq/prefect:0.15.6-python3.8
# for spark
ENV JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
ENV SPARK_HOME="/spark/spark-3.1.2-bin-hadoop3.2/"
ENV PYTHONPATH="/spark/spark-3.1.2-bin-hadoop3.2/python:$PYTHONPATH"
ENV PYSPARK_PYTHON="python3"
ENV PATH="$PATH:/spark/spark-3.1.2-bin-hadoop3.2/bin"
ENV PATH="$PATH:$JAVA_HOME"
ENV PATH="$PATH:$JAVA_HOME/bin"
ENV PATH="$PATH:$JAVA_HOME/jre/bin"
ENV SPARK_LOCAL_IP="127.0.0.1"
WORKDIR /
COPY . /
RUN apt-get update && \
apt-get install -y \
openjdk-8-jdk \
python3-pip
ADD <https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz> spark.tgz
RUN mkdir -p spark && \
tar -zxvf spark.tgz -C spark/ && \
rm spark.tgz
# for prefect
RUN pip install feast feast-postgres sqlalchemy google-auth scikit-learn
RUN pip install feast[gcp]
RUN pip install --upgrade google-cloud
RUN pip install --upgrade google-cloud-bigquery
RUN pip install --upgrade google-cloud-storage
WORKDIR /opt/prefect
COPY flow_utilities/ /opt/prefect/flow_utilities/
COPY flow_utilities_bigQ_Datastore/ /opt/prefect/flow_utilities_bigQ_Datastore/
COPY setup.py /opt/prefect/setup.py
COPY .feastignore /opt/prefect/.feastignore
RUN pip install .
vawati
01/08/2022, 5:47 PMdate
(think a task that downloads data corresponding to a date).
I would like to have this task run once a day for all days, starting from a date in the past, lets call it start_time
.
How do I achieve this with Prefect?
The closes I've found is this: https://github.com/PrefectHQ/prefect/issues/1179
current_time = Parameter("timestamp", default=None)
@prefect.task
def do_something_time_specific(current_time):
if current_time is None:
current_time = prefect.context.get("scheduled_start_time")
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# does something dealing with time
In words: the task depends on a date
but if a date isn't passed, it uses the scheduled_start_time
.
This seems to have a problem. Lets suppose it's now 2022-01-01
and Prefect has scheduled tasks for 2022-01-02
, 2022-01-03
, etc...
Now my computer is off for a few days and I turn it on again on 2022-01-04
. What happened to the runs of those two dates which were jumped? Do they both run on 2022-01-04
but with scheduled_start_time
in 2022-01-02
, 2022-01-03
? If that's the case, this solution does the right thing. Or do they both run on 2022-01-04
but the scheduled_start_time
is 2022-01-04
for both? (in which case this solution doesn't work)Mike Lev
01/09/2022, 2:47 PMMike Lev
01/09/2022, 2:47 PMKevin Kho
01/09/2022, 3:00 PMAndrew Black
01/09/2022, 3:13 PM