Danny Vilela
01/06/2022, 8:24 PMFoo
that maps over a task input (list
) with 3 values. Sometimes, though, some of those values aren’t applicable (for example, if it’s a requested date that doesn’t exist in our database), so I thought of raising prefect.engine.signals.SKIP
to note that (1) this dynamic task input doesn’t apply but also that (2) it wasn’t a failure, per se.
That said, I’m noticing that a task Bar
that is directly downstream of Foo
is also skipping because when Foo
skips any of its mapped tasks. It seems this is intended, but is there a trigger I should raise to note that “it’s fine if any number of these mapped tasks fails”? Bar
has other upstream tasks but I wouldn’t want those to be considered.
Does skip_on_upstream_skip
apply here? Should I configure Bar
such that skip_on_upstream_skip=False
? From the docs here.John Jacoby
01/06/2022, 8:48 PMwith Flow(constants['name'], result=result) as flow:
John Jacoby
01/06/2022, 8:49 PMJohn Jacoby
01/06/2022, 8:49 PM@task(target='{task_name}/{scan_id}', checkpoint=True, nout=2)
def bourget2bids(participant_id: str, scan_id: str, study_constants):
tmp_BIDS_dir, bourget_path = get_from_bourget(participant_id, scan_id, study_constants['name'], study_constants['scripts_dir'])
add_ASL_metadata(scan_id, study_constants['name'])
BIDS_path = copy_to_study_data_folder(participant_id, scan_id, tmp_BIDS_dir, study_constants['name'], study_constants['data_dir'])
return bourget_path, BIDS_pathJohn Jacoby
01/06/2022, 8:49 PMJohn Jacoby
01/06/2022, 8:49 PMJohn Jacoby
01/06/2022, 8:50 PMDanny Vilela
01/06/2022, 8:54 PMprefect.engine.signals.SKIP
, does it need to return a SKIP
as its result? Can I configure the (mapped or mapper) task such that it only returns a result (say, a string) if the task did not skip? Or [does/should] it have to return a SKIP
?KhTan
01/06/2022, 9:40 PMKhTan
01/06/2022, 11:58 PMEnda Peng
01/07/2022, 2:40 AMShivam Bhatia
01/07/2022, 10:07 AMGabriel Milan
01/07/2022, 12:54 PMrequests.exceptions.ConnectionError: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7efe788176d0>: Failed to establish a new connection: [Errno 111] Connection refused'))
But the weird thing is that if I open a terminal inside the same job pod and try to curl prefect-apollo.prefect:4200
, I can successfully get an answer from the apollo server.
Has anyone had anything similar before? Or tried Istio with Prefect on Kubernetes?Dave
01/07/2022, 2:21 PMJason Motley
01/07/2022, 4:13 PMYusuf Khan
01/07/2022, 5:25 PMself
?Danny Vilela
01/07/2022, 9:06 PMmapped(flatten(nested_list_data))
to a task?Greg Adams
01/07/2022, 9:14 PMAshton
01/08/2022, 1:36 AMStephen Herron
01/08/2022, 7:00 AMwith Flow(FLOW_NAME) as flow:
event_dates = get_event_dates()
s3_keys = generate_s3_key.map(suffix=event_dates)
event_file_data = unload_data_to_s3.map(s3_keys, event_dates)
update_log = update_log.map(event_dates, upstream_tasks=[event_file_data])
update_snowflake = update_snowflake.map(s3_keys, event_dates, upstream_tasks=[update_log])
The problem is when I schedule this in cloud (local agent) even though all the mapped tasks complete the run doesn’t seem to terminate. Am I missing something?Aqib Fayyaz
01/08/2022, 1:54 PMUnable to locate package openjdk-8-jdk
. Is the issue is because of base image, for other docker files where spark run we have ubuntu 20.04 as base image but for prefect we have prefect as base image. Below is the docker file
FROM prefecthq/prefect:0.15.6-python3.8
# for spark
ENV JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
ENV SPARK_HOME="/spark/spark-3.1.2-bin-hadoop3.2/"
ENV PYTHONPATH="/spark/spark-3.1.2-bin-hadoop3.2/python:$PYTHONPATH"
ENV PYSPARK_PYTHON="python3"
ENV PATH="$PATH:/spark/spark-3.1.2-bin-hadoop3.2/bin"
ENV PATH="$PATH:$JAVA_HOME"
ENV PATH="$PATH:$JAVA_HOME/bin"
ENV PATH="$PATH:$JAVA_HOME/jre/bin"
ENV SPARK_LOCAL_IP="127.0.0.1"
WORKDIR /
COPY . /
RUN apt-get update && \
apt-get install -y \
openjdk-8-jdk \
python3-pip
ADD <https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz> spark.tgz
RUN mkdir -p spark && \
tar -zxvf spark.tgz -C spark/ && \
rm spark.tgz
# for prefect
RUN pip install feast feast-postgres sqlalchemy google-auth scikit-learn
RUN pip install feast[gcp]
RUN pip install --upgrade google-cloud
RUN pip install --upgrade google-cloud-bigquery
RUN pip install --upgrade google-cloud-storage
WORKDIR /opt/prefect
COPY flow_utilities/ /opt/prefect/flow_utilities/
COPY flow_utilities_bigQ_Datastore/ /opt/prefect/flow_utilities_bigQ_Datastore/
COPY setup.py /opt/prefect/setup.py
COPY .feastignore /opt/prefect/.feastignore
RUN pip install .
vawati
01/08/2022, 5:47 PMdate
(think a task that downloads data corresponding to a date).
I would like to have this task run once a day for all days, starting from a date in the past, lets call it start_time
.
How do I achieve this with Prefect?
The closes I've found is this: https://github.com/PrefectHQ/prefect/issues/1179
current_time = Parameter("timestamp", default=None)
@prefect.task
def do_something_time_specific(current_time):
if current_time is None:
current_time = prefect.context.get("scheduled_start_time")
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# does something dealing with time
In words: the task depends on a date
but if a date isn't passed, it uses the scheduled_start_time
.
This seems to have a problem. Lets suppose it's now 2022-01-01
and Prefect has scheduled tasks for 2022-01-02
, 2022-01-03
, etc...
Now my computer is off for a few days and I turn it on again on 2022-01-04
. What happened to the runs of those two dates which were jumped? Do they both run on 2022-01-04
but with scheduled_start_time
in 2022-01-02
, 2022-01-03
? If that's the case, this solution does the right thing. Or do they both run on 2022-01-04
but the scheduled_start_time
is 2022-01-04
for both? (in which case this solution doesn't work)Mike Lev
01/09/2022, 2:47 PMHwi Moon
01/10/2022, 9:00 AMSuresh R
01/10/2022, 9:02 AMBruno Murino
01/10/2022, 9:51 AMWARNING:urllib3.connectionpool:Retrying (Retry(total=5, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Read timed out. (read timeout=15)")': /
it got those many times, then the job was considered dead and etc.
Does anyone know why this could have happened?Henrietta Salonen
01/10/2022, 10:39 AMShivam Bhatia
01/10/2022, 11:15 AMPrudhvi Kalakota
01/10/2022, 11:32 AMAndy Waugh
01/10/2022, 11:42 AM